355 lines
No EOL
12 KiB
Python
355 lines
No EOL
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import duckdb
|
|
import subprocess
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
import re
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: phrase_modeling_job.py {config|exec} [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == "config":
|
|
handle_config(sys.argv[2:])
|
|
elif command == "exec":
|
|
handle_exec(sys.argv[2:])
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
|
|
"""Parse partition ref like 'phrase_models/category=comedy/date=2020-01-01' into components."""
|
|
match = re.match(r'phrase_models/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
|
|
if not match:
|
|
raise ValueError(f"Invalid partition ref format: {partition_ref}")
|
|
return {"category": match.group(1), "date": match.group(2)}
|
|
|
|
def handle_config(args):
|
|
if len(args) < 1:
|
|
print("Config mode requires partition ref", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
configs = []
|
|
|
|
# Process each partition reference
|
|
for partition_ref in args:
|
|
try:
|
|
parsed = parse_partition_ref(partition_ref)
|
|
category = parsed["category"]
|
|
date_str = parsed["date"]
|
|
except ValueError as e:
|
|
print(f"Error parsing partition ref: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Dependencies: categorized reviews for the category and date
|
|
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
|
|
|
|
configs.append({
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
|
|
],
|
|
"args": [category, date_str],
|
|
"env": {
|
|
"PARTITION_REF": partition_ref,
|
|
"TARGET_CATEGORY": category,
|
|
"TARGET_DATE": date_str
|
|
}
|
|
})
|
|
|
|
config = {"configs": configs}
|
|
print(json.dumps(config))
|
|
|
|
def handle_exec(args):
|
|
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
|
|
|
|
if len(args) < 2:
|
|
print("Exec mode requires category and date arguments", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
target_category = args[0]
|
|
target_date = args[1]
|
|
partition_ref = os.getenv('PARTITION_REF', f'phrase_models/category={target_category}/date={target_date}')
|
|
|
|
# Input path
|
|
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
|
|
|
|
# Check input file exists
|
|
if not os.path.exists(categorized_reviews_file):
|
|
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Output path
|
|
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/phrase_models/category={target_category}/date={target_date}")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_file = output_dir / "phrase_models.parquet"
|
|
|
|
try:
|
|
# Extract phrases using phrase modeling
|
|
extract_phrases_for_category_date(categorized_reviews_file, target_category, target_date, str(output_file))
|
|
|
|
print(f"Successfully extracted phrases for category {target_category} on {target_date}")
|
|
print(f"Output written to: {output_file}")
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
|
|
],
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": datetime.now().isoformat(),
|
|
"task": {
|
|
"job": {"label": "//examples/podcast_reviews:phrase_modeling_job"},
|
|
"config": {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
|
|
],
|
|
"args": [target_category, target_date],
|
|
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest_file = output_dir / "manifest.json"
|
|
with open(manifest_file, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting phrases: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def extract_phrases_for_category_date(categorized_reviews_file: str, target_category: str, target_date: str, output_file: str):
|
|
"""Extract phrases from categorized reviews using phrase binary or simple ngram extraction."""
|
|
|
|
# Connect to DuckDB for processing
|
|
duckdb_conn = duckdb.connect()
|
|
|
|
try:
|
|
# Try to install and load parquet extension, but don't fail if it's already installed
|
|
try:
|
|
duckdb_conn.execute("INSTALL parquet")
|
|
except Exception:
|
|
pass # Extension might already be installed
|
|
|
|
duckdb_conn.execute("LOAD parquet")
|
|
|
|
# Check if phrase binary is available
|
|
phrase_binary = find_phrase_binary()
|
|
|
|
if phrase_binary:
|
|
# Use external phrase binary
|
|
phrases = extract_with_phrase_binary(categorized_reviews_file, phrase_binary)
|
|
else:
|
|
print("Warning: phrase binary not found, using simple ngram extraction")
|
|
# Fallback to simple ngram extraction
|
|
phrases = extract_simple_ngrams(categorized_reviews_file, duckdb_conn)
|
|
|
|
# Convert phrases to structured data and save
|
|
if phrases:
|
|
save_phrases_to_parquet(phrases, target_category, target_date, output_file, duckdb_conn)
|
|
else:
|
|
# Create empty parquet file with correct schema
|
|
create_empty_phrase_parquet(target_category, target_date, output_file, duckdb_conn)
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
def find_phrase_binary() -> str:
|
|
"""Find phrase binary in common locations."""
|
|
possible_paths = [
|
|
"/usr/local/bin/phrase",
|
|
"/usr/bin/phrase",
|
|
"./phrase",
|
|
"../phrase",
|
|
os.path.expanduser("~/bin/phrase")
|
|
]
|
|
|
|
for path in possible_paths:
|
|
if os.path.exists(path) and os.access(path, os.X_OK):
|
|
return path
|
|
|
|
return None
|
|
|
|
def extract_with_phrase_binary(categorized_reviews_file: str, phrase_binary: str) -> List[Dict[str, Any]]:
|
|
"""Extract phrases using the external phrase binary."""
|
|
|
|
# Read review content to temporary file
|
|
duckdb_conn = duckdb.connect()
|
|
try:
|
|
# Try to install and load parquet extension, but don't fail if it's already installed
|
|
try:
|
|
duckdb_conn.execute("INSTALL parquet")
|
|
except Exception:
|
|
pass # Extension might already be installed
|
|
|
|
duckdb_conn.execute("LOAD parquet")
|
|
|
|
# Extract review content
|
|
content_query = f"SELECT content FROM parquet_scan('{categorized_reviews_file}') WHERE content IS NOT NULL AND content != ''"
|
|
results = duckdb_conn.execute(content_query).fetchall()
|
|
|
|
if not results:
|
|
return []
|
|
|
|
# Write content to temporary file
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file:
|
|
for (content,) in results:
|
|
temp_file.write(content.strip() + '\n')
|
|
temp_file_path = temp_file.name
|
|
|
|
try:
|
|
# Run phrase binary
|
|
cmd = [phrase_binary, "--input", temp_file_path, "--output-format", "json"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Phrase binary failed: {result.stderr}", file=sys.stderr)
|
|
return []
|
|
|
|
# Parse JSON output
|
|
phrases_data = json.loads(result.stdout) if result.stdout.strip() else []
|
|
return phrases_data
|
|
|
|
finally:
|
|
# Clean up temp file
|
|
os.unlink(temp_file_path)
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
def extract_simple_ngrams(categorized_reviews_file: str, duckdb_conn) -> List[Dict[str, Any]]:
|
|
"""Simple ngram extraction using SQL as fallback."""
|
|
|
|
# Simple phrase extraction using SQL
|
|
query = f"""
|
|
WITH word_tokens AS (
|
|
SELECT
|
|
unnest(string_split(lower(regexp_replace(content, '[^a-zA-Z0-9\\s]', '', 'g')), ' ')) as word,
|
|
podcast_id,
|
|
rating
|
|
FROM parquet_scan('{categorized_reviews_file}')
|
|
WHERE content IS NOT NULL AND content != ''
|
|
),
|
|
bigrams AS (
|
|
SELECT
|
|
word || ' ' || lead(word) OVER (PARTITION BY podcast_id ORDER BY rowid) as ngram,
|
|
rating
|
|
FROM (SELECT *, row_number() OVER () as rowid FROM word_tokens) t
|
|
WHERE word IS NOT NULL AND word != ''
|
|
),
|
|
phrase_stats AS (
|
|
SELECT
|
|
ngram,
|
|
COUNT(*) as frequency,
|
|
AVG(rating::FLOAT) as avg_rating,
|
|
MIN(rating) as min_rating,
|
|
MAX(rating) as max_rating
|
|
FROM bigrams
|
|
WHERE ngram IS NOT NULL AND ngram NOT LIKE '% %' = false
|
|
GROUP BY ngram
|
|
HAVING COUNT(*) >= 3 -- Only phrases that appear at least 3 times
|
|
)
|
|
SELECT
|
|
ngram,
|
|
frequency,
|
|
avg_rating,
|
|
min_rating,
|
|
max_rating,
|
|
CASE
|
|
WHEN avg_rating >= 4.0 THEN frequency * avg_rating * 0.8
|
|
WHEN avg_rating <= 2.0 THEN frequency * (5.0 - avg_rating) * 0.8
|
|
ELSE frequency * 0.3
|
|
END as score
|
|
FROM phrase_stats
|
|
ORDER BY score DESC
|
|
LIMIT 1000
|
|
"""
|
|
|
|
try:
|
|
results = duckdb_conn.execute(query).fetchall()
|
|
|
|
phrases = []
|
|
for row in results:
|
|
ngram, frequency, avg_rating, min_rating, max_rating, score = row
|
|
phrases.append({
|
|
"ngram": ngram,
|
|
"frequency": frequency,
|
|
"avg_rating": float(avg_rating) if avg_rating else 0.0,
|
|
"min_rating": min_rating,
|
|
"max_rating": max_rating,
|
|
"score": float(score) if score else 0.0,
|
|
"hash": hash(ngram) % (2**31) # Simple hash for compatibility
|
|
})
|
|
|
|
return phrases
|
|
|
|
except Exception as e:
|
|
print(f"Error in simple ngram extraction: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def save_phrases_to_parquet(phrases: List[Dict[str, Any]], category: str, date: str, output_file: str, duckdb_conn):
|
|
"""Save phrases to parquet file."""
|
|
|
|
if not phrases:
|
|
create_empty_phrase_parquet(category, date, output_file, duckdb_conn)
|
|
return
|
|
|
|
# Create temporary table with phrases
|
|
duckdb_conn.execute("DROP TABLE IF EXISTS temp_phrases")
|
|
duckdb_conn.execute("""
|
|
CREATE TABLE temp_phrases (
|
|
date VARCHAR,
|
|
category VARCHAR,
|
|
hash BIGINT,
|
|
ngram VARCHAR,
|
|
score DOUBLE
|
|
)
|
|
""")
|
|
|
|
# Insert phrase data
|
|
for phrase in phrases:
|
|
duckdb_conn.execute("""
|
|
INSERT INTO temp_phrases VALUES (?, ?, ?, ?, ?)
|
|
""", [
|
|
date,
|
|
category,
|
|
phrase.get("hash", hash(phrase.get("ngram", "")) % (2**31)),
|
|
phrase.get("ngram", ""),
|
|
phrase.get("score", 0.0)
|
|
])
|
|
|
|
# Save to parquet
|
|
duckdb_conn.execute(f"COPY temp_phrases TO '{output_file}' (FORMAT PARQUET)")
|
|
|
|
print(f"Saved {len(phrases)} phrases to parquet file")
|
|
|
|
def create_empty_phrase_parquet(category: str, date: str, output_file: str, duckdb_conn):
|
|
"""Create empty parquet file with correct schema."""
|
|
|
|
duckdb_conn.execute("DROP TABLE IF EXISTS empty_phrases")
|
|
duckdb_conn.execute("""
|
|
CREATE TABLE empty_phrases (
|
|
date VARCHAR,
|
|
category VARCHAR,
|
|
hash BIGINT,
|
|
ngram VARCHAR,
|
|
score DOUBLE
|
|
)
|
|
""")
|
|
|
|
duckdb_conn.execute(f"COPY empty_phrases TO '{output_file}' (FORMAT PARQUET)")
|
|
print("Created empty phrase models file (no phrases extracted)")
|
|
|
|
if __name__ == "__main__":
|
|
main() |