This commit is contained in:
Stuart Axelbrooke 2025-06-30 22:15:48 -07:00
parent 196622fe17
commit 55c404ca2e
16 changed files with 3481 additions and 15 deletions

View file

@ -187,7 +187,7 @@
},
"@@pybind11_bazel+//:python_configure.bzl%extension": {
"general": {
"bzlTransitiveDigest": "d4N/SZrl3ONcmzE98rcV0Fsro0iUbjNQFTIiLiGuH+k=",
"bzlTransitiveDigest": "OMjJ8aOAn337bDg7jdyvF/juIrC2PpUcX6Dnf+nhcF0=",
"usagesDigest": "fycyB39YnXIJkfWCIXLUKJMZzANcuLy9ZE73hRucjFk=",
"recordedFileInputs": {
"@@pybind11_bazel+//MODULE.bazel": "88af1c246226d87e65be78ed49ecd1e6f5e98648558c14ce99176da041dc378e"
@ -221,7 +221,7 @@
},
"@@rules_fuzzing+//fuzzing/private:extensions.bzl%non_module_dependencies": {
"general": {
"bzlTransitiveDigest": "mGiTB79hRNjmeDTQdzkpCHyzXhErMbufeAmySBt7s5s=",
"bzlTransitiveDigest": "lxvzPQyluk241QRYY81nZHOcv5Id/5U2y6dp42qibis=",
"usagesDigest": "wy6ISK6UOcBEjj/mvJ/S3WeXoO67X+1llb9yPyFtPgc=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},
@ -525,7 +525,7 @@
},
"@@rules_python+//python/private/pypi:pip.bzl%pip_internal": {
"general": {
"bzlTransitiveDigest": "sCGUUdVOVATRPlKd1IJea1kfLmtsYsAZdVI5HkdAUQo=",
"bzlTransitiveDigest": "bKQjDjomeeeh547JZoDNozPUkVrO368PlWs0shDGtJU=",
"usagesDigest": "OLoIStnzNObNalKEMRq99FqenhPGLFZ5utVLV4sz7OI=",
"recordedFileInputs": {
"@@rules_python+//tools/publish/requirements_darwin.txt": "2994136eab7e57b083c3de76faf46f70fad130bc8e7360a7fed2b288b69e79dc",
@ -4171,7 +4171,7 @@
},
"@@rules_rust+//crate_universe/private:internal_extensions.bzl%cu_nr": {
"general": {
"bzlTransitiveDigest": "7DC2ciVAva/LfjqxrbJs5WDxaCDqDaPY4HXXZriW120=",
"bzlTransitiveDigest": "mDJ0pT/rBCHMm7FzlOzh9Qng+sXi1kyQXEU8TahWqRc=",
"usagesDigest": "Pr9/2PR9/ujuo94SXikpx+fg31V4bDKobC10YJu+z5I=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},

View file

@ -280,8 +280,8 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
let outputs_strs: Vec<String> = task.config.outputs.iter().map(|o| o.str.clone()).collect();
let outputs_key = outputs_strs.join("_");
let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_"));
job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
// Create a descriptive label that includes both job label and outputs
let job_label = &task.job_label;
@ -309,7 +309,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Process inputs (dependencies)
for input in &task.config.inputs {
let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_"));
let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
@ -323,7 +323,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
input.partition_ref.str,
input.partition_ref.str.replace("/", "_").replace("=", "_"),
node_class
));
added_refs.insert(ref_node_id.clone());
@ -341,7 +341,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Process outputs
for output in &task.config.outputs {
let ref_node_id = format!("ref_{}", output.str.replace("/", "_"));
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {

View file

@ -1,5 +1,5 @@
load("//:py_repl.bzl", "py_repl")
load("@databuild//databuild:rules.bzl", "databuild_job")
load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
load("@rules_python//python:pip.bzl", "compile_pip_requirements")
load("@pypi//:requirements.bzl", "requirement")
@ -9,6 +9,142 @@ compile_pip_requirements(
requirements_txt = "requirements_lock.txt",
)
# Podcast Reviews Graph
databuild_graph(
name = "podcast_reviews_graph",
jobs = [
":extract_reviews_job",
":extract_podcasts_job",
":categorize_reviews_job",
":phrase_modeling_job",
":phrase_stats_job",
":daily_summary_job",
],
lookup = ":job_lookup",
visibility = ["//visibility:public"],
)
py_binary(
name = "job_lookup",
srcs = ["job_lookup.py"],
main = "job_lookup.py",
)
# Extract Reviews Job
databuild_job(
name = "extract_reviews_job",
binary = ":extract_reviews_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "extract_reviews_binary",
srcs = ["extract_reviews_job.py", "duckdb_utils.py"],
main = "extract_reviews_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Extract Podcasts Job
databuild_job(
name = "extract_podcasts_job",
binary = ":extract_podcasts_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "extract_podcasts_binary",
srcs = ["extract_podcasts_job.py", "duckdb_utils.py"],
main = "extract_podcasts_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Categorize Reviews Job
databuild_job(
name = "categorize_reviews_job",
binary = ":categorize_reviews_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "categorize_reviews_binary",
srcs = ["categorize_reviews_job.py", "duckdb_utils.py"],
main = "categorize_reviews_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Phrase Modeling Job
databuild_job(
name = "phrase_modeling_job",
binary = ":phrase_modeling_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "phrase_modeling_binary",
srcs = ["phrase_modeling_job.py", "duckdb_utils.py"],
main = "phrase_modeling_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Phrase Stats Job
databuild_job(
name = "phrase_stats_job",
binary = ":phrase_stats_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "phrase_stats_binary",
srcs = ["phrase_stats_job.py", "duckdb_utils.py"],
main = "phrase_stats_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Daily Summary Job
databuild_job(
name = "daily_summary_job",
binary = ":daily_summary_binary",
visibility = ["//visibility:public"],
)
py_binary(
name = "daily_summary_binary",
srcs = ["daily_summary_job.py", "duckdb_utils.py"],
main = "daily_summary_job.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
# Legacy test job (kept for compatibility)
databuild_job(
name = "test_job",
binary = ":test_job_binary",
@ -20,6 +156,29 @@ py_binary(
main = "unified_job.py",
)
# Test target
py_binary(
name = "test_jobs",
srcs = [
"test_jobs.py",
"extract_reviews_job.py",
"extract_podcasts_job.py",
"categorize_reviews_job.py",
"phrase_modeling_job.py",
"phrase_stats_job.py",
"daily_summary_job.py",
"job_lookup.py",
"duckdb_utils.py",
],
main = "test_jobs.py",
deps = [
requirement("duckdb"),
requirement("pydantic"),
requirement("pandas"),
requirement("pyarrow"),
],
)
py_repl(
name = "repl",
deps = [

File diff suppressed because one or more lines are too long

View file

@ -21,3 +21,7 @@ flowchart LR
## Input Data
Get it from [here](https://www.kaggle.com/datasets/thoughtvector/podcastreviews/versions/28?select=database.sqlite)! (and put it in `examples/podcast_reviews/data/ingest/database.sqlite`)
## `phrase` Dependency
This relies on [`soaxelbrooke/phrase`](https://github.com/soaxelbrooke/phrase) for phrase extraction - check out its [releases](https://github.com/soaxelbrooke/phrase/releases) to get a relevant binary.

View file

@ -0,0 +1,188 @@
#!/usr/bin/env python3
import sys
import json
import os
import duckdb
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: categorize_reviews_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'categorized_reviews/category=comedy/date=2020-01-01' into components."""
match = re.match(r'categorized_reviews/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: reviews for the date and podcast metadata
reviews_ref = f"reviews/date={date_str}"
podcasts_ref = "podcasts/all"
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": reviews_ref}},
{"dep_type": 1, "partition_ref": {"str": podcasts_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
print(json.dumps(config))
def handle_exec(args):
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'categorized_reviews/category={target_category}/date={target_date}')
# Input paths
reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}/reviews.parquet"
podcasts_file = "/tmp/databuild_test/examples/podcast_reviews/podcasts/podcasts.parquet"
# Check input files exist
if not os.path.exists(reviews_file):
print(f"Reviews file not found: {reviews_file}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(podcasts_file):
print(f"Podcasts file not found: {podcasts_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "categorized_reviews.parquet"
try:
# Categorize reviews by joining with podcast metadata
categorize_reviews_for_category_date(reviews_file, podcasts_file, target_category, str(output_file))
print(f"Successfully categorized reviews for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"reviews/date={target_date}"},
{"str": "podcasts/all"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:categorize_reviews_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"reviews/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": "podcasts/all"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error categorizing reviews: {e}", file=sys.stderr)
sys.exit(1)
def categorize_reviews_for_category_date(reviews_file: str, podcasts_file: str, target_category: str, output_file: str):
"""Join reviews with podcast categories and filter for target category."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Query to join reviews with podcasts and filter by category
query = f"""
SELECT
r.podcast_id,
r.review_title,
r.content,
r.rating,
r.author_id,
r.created_at,
r.review_date,
p.title as podcast_title,
p.primary_category,
p.all_categories,
'{target_category}' as target_category
FROM parquet_scan('{reviews_file}') r
JOIN parquet_scan('{podcasts_file}') p ON r.podcast_id = p.podcast_id
WHERE p.primary_category = '{target_category}'
OR p.all_categories LIKE '%{target_category}%'
ORDER BY r.created_at
"""
# Execute query and save to parquet
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
# Get row count for logging
count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()
row_count = count_result[0] if count_result else 0
print(f"Categorized {row_count} reviews for category '{target_category}'")
if row_count == 0:
print(f"Warning: No reviews found for category '{target_category}' on date '{reviews_file.split('date=')[1].split('/')[0]}'")
finally:
duckdb_conn.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,312 @@
#!/usr/bin/env python3
import sys
import json
import os
import duckdb
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: daily_summary_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'daily_summaries/category=comedy/date=2020-01-01' into components."""
match = re.match(r'daily_summaries/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase stats and categorized reviews for the category and date
phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}},
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
print(json.dumps(config))
def handle_exec(args):
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'daily_summaries/category={target_category}/date={target_date}')
# Input paths
phrase_stats_file = f"/tmp/databuild_test/examples/podcast_reviews/phrase_stats/category={target_category}/date={target_date}/phrase_stats.parquet"
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
# Check input files exist
if not os.path.exists(phrase_stats_file):
print(f"Phrase stats file not found: {phrase_stats_file}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(categorized_reviews_file):
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/daily_summaries/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "daily_summary.parquet"
try:
# Generate daily summary combining phrase stats and recent reviews
generate_daily_summary_for_category_date(
phrase_stats_file,
categorized_reviews_file,
target_category,
target_date,
str(output_file)
)
print(f"Successfully generated daily summary for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"phrase_stats/category={target_category}/date={target_date}"},
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:daily_summary_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error generating daily summary: {e}", file=sys.stderr)
sys.exit(1)
def generate_daily_summary_for_category_date(
phrase_stats_file: str,
categorized_reviews_file: str,
target_category: str,
target_date: str,
output_file: str
):
"""Generate daily summary combining top phrases and recent reviews."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Check if we have data
phrase_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{phrase_stats_file}')").fetchone()[0]
review_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{categorized_reviews_file}')").fetchone()[0]
if phrase_count == 0 and review_count == 0:
print(f"No data found, creating empty daily summary")
create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn)
return
# Query to generate comprehensive daily summary
query = f"""
WITH top_phrases_per_podcast AS (
SELECT
podcast_id,
podcast_title,
ngram,
count as phrase_count,
avg_rating as phrase_avg_rating,
weighted_score,
ROW_NUMBER() OVER (PARTITION BY podcast_id ORDER BY weighted_score DESC) as phrase_rank
FROM parquet_scan('{phrase_stats_file}')
WHERE ngram IS NOT NULL
),
podcast_phrase_summary AS (
SELECT
podcast_id,
podcast_title,
STRING_AGG(ngram, '; ' ORDER BY weighted_score DESC) as top_phrases,
COUNT(*) as total_phrases,
AVG(phrase_avg_rating) as avg_phrase_rating,
SUM(weighted_score) as total_phrase_score
FROM top_phrases_per_podcast
WHERE phrase_rank <= 5 -- Top 5 phrases per podcast
GROUP BY podcast_id, podcast_title
),
podcast_review_summary AS (
SELECT
podcast_id,
podcast_title,
COUNT(*) as review_count,
AVG(rating::FLOAT) as avg_rating,
MIN(rating) as min_rating,
MAX(rating) as max_rating,
COUNT(CASE WHEN rating >= 4 THEN 1 END) as positive_reviews,
COUNT(CASE WHEN rating <= 2 THEN 1 END) as negative_reviews,
STRING_AGG(
CASE WHEN rating <= 2 AND length(content) > 20
THEN substring(content, 1, 200) || '...'
ELSE NULL
END,
' | '
ORDER BY rating ASC, length(content) DESC
) as sample_negative_reviews
FROM parquet_scan('{categorized_reviews_file}')
WHERE podcast_title IS NOT NULL
GROUP BY podcast_id, podcast_title
),
daily_summary AS (
SELECT
'{target_date}' as date,
'{target_category}' as category,
COALESCE(pps.podcast_id, prs.podcast_id) as podcast_id,
COALESCE(pps.podcast_title, prs.podcast_title) as podcast_title,
COALESCE(prs.review_count, 0) as review_count,
COALESCE(prs.avg_rating, 0.0) as avg_rating,
COALESCE(prs.positive_reviews, 0) as positive_reviews,
COALESCE(prs.negative_reviews, 0) as negative_reviews,
COALESCE(pps.top_phrases, 'No significant phrases') as top_phrases,
COALESCE(pps.total_phrases, 0) as total_phrases,
COALESCE(pps.avg_phrase_rating, 0.0) as avg_phrase_rating,
COALESCE(pps.total_phrase_score, 0.0) as total_phrase_score,
prs.sample_negative_reviews,
CASE
WHEN prs.avg_rating >= 4.0 AND pps.avg_phrase_rating >= 4.0 THEN 'Highly Positive'
WHEN prs.avg_rating >= 3.5 THEN 'Positive'
WHEN prs.avg_rating >= 2.5 THEN 'Mixed'
WHEN prs.avg_rating >= 1.5 THEN 'Negative'
ELSE 'Highly Negative'
END as sentiment_category,
(prs.review_count * prs.avg_rating * 0.6 + pps.total_phrase_score * 0.4) as overall_score
FROM podcast_phrase_summary pps
FULL OUTER JOIN podcast_review_summary prs
ON pps.podcast_id = prs.podcast_id
WHERE COALESCE(prs.review_count, 0) > 0 OR COALESCE(pps.total_phrases, 0) > 0
)
SELECT
date,
category,
podcast_id,
podcast_title,
review_count,
avg_rating,
positive_reviews,
negative_reviews,
top_phrases,
total_phrases,
avg_phrase_rating,
total_phrase_score,
sample_negative_reviews,
sentiment_category,
overall_score
FROM daily_summary
ORDER BY overall_score DESC, review_count DESC
"""
# Execute query and save to parquet
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
# Get row count for logging
count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()
row_count = count_result[0] if count_result else 0
print(f"Generated daily summary for {row_count} podcasts")
if row_count == 0:
print(f"Warning: No summary data generated for category '{target_category}' on date '{target_date}'")
create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn)
finally:
duckdb_conn.close()
def create_empty_daily_summary(category: str, date: str, output_file: str, duckdb_conn):
"""Create empty daily summary parquet file with correct schema."""
duckdb_conn.execute("DROP TABLE IF EXISTS empty_daily_summary")
duckdb_conn.execute("""
CREATE TABLE empty_daily_summary (
date VARCHAR,
category VARCHAR,
podcast_id VARCHAR,
podcast_title VARCHAR,
review_count BIGINT,
avg_rating DOUBLE,
positive_reviews BIGINT,
negative_reviews BIGINT,
top_phrases VARCHAR,
total_phrases BIGINT,
avg_phrase_rating DOUBLE,
total_phrase_score DOUBLE,
sample_negative_reviews VARCHAR,
sentiment_category VARCHAR,
overall_score DOUBLE
)
""")
duckdb_conn.execute(f"COPY empty_daily_summary TO '{output_file}' (FORMAT PARQUET)")
print("Created empty daily summary file")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
Centralized DuckDB utilities for handling extension issues in isolated environments.
"""
import duckdb
import sqlite3
import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional
import warnings
def create_duckdb_connection(enable_extensions: bool = True) -> duckdb.DuckDBPyConnection:
"""
Create a DuckDB connection with proper extension handling for isolated environments.
Args:
enable_extensions: Whether to try enabling extensions (may fail in isolated environments)
Returns:
DuckDB connection object
"""
conn = duckdb.connect()
if enable_extensions:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
# Try to enable extensions, but don't fail if not available
conn.execute("SET autoinstall_known_extensions=1")
conn.execute("SET autoload_known_extensions=1")
except Exception:
# Extensions not available, will use fallback methods
pass
return conn
def sqlite_to_dataframe(sqlite_path: str, query: str, params: Optional[List[Any]] = None) -> pd.DataFrame:
"""
Execute a SQLite query and return results as a pandas DataFrame.
This is a fallback when DuckDB's sqlite_scan doesn't work.
Args:
sqlite_path: Path to SQLite database
query: SQL query to execute
params: Query parameters
Returns:
DataFrame with query results
"""
conn = sqlite3.connect(sqlite_path)
try:
if params:
df = pd.read_sql_query(query, conn, params=params)
else:
df = pd.read_sql_query(query, conn)
return df
finally:
conn.close()
def execute_query_with_fallback(
duckdb_conn: duckdb.DuckDBPyConnection,
sqlite_path: str,
query: str,
params: Optional[List[Any]] = None,
use_sqlite_scan: bool = True
) -> pd.DataFrame:
"""
Execute a query using DuckDB's sqlite_scan if available, otherwise fall back to direct SQLite access.
Args:
duckdb_conn: DuckDB connection
sqlite_path: Path to SQLite database
query: SQL query (should work with both sqlite_scan and direct SQLite)
params: Query parameters
use_sqlite_scan: Whether to try sqlite_scan first
Returns:
DataFrame with query results
"""
if use_sqlite_scan:
try:
# Try using DuckDB's sqlite_scan
if params:
result = duckdb_conn.execute(query, params).df()
else:
result = duckdb_conn.execute(query).df()
return result
except Exception as e:
print(f"sqlite_scan failed: {e}, falling back to direct SQLite access")
# Fallback: Use direct SQLite access
# Convert DuckDB sqlite_scan query to regular SQLite query
fallback_query = query.replace("sqlite_scan(?, 'reviews')", "reviews")
fallback_query = fallback_query.replace("sqlite_scan(?, 'podcasts')", "podcasts")
fallback_query = fallback_query.replace("sqlite_scan(?, 'categories')", "categories")
# Remove the sqlite_path parameter since we're connecting directly
if params and len(params) > 0 and params[0] == sqlite_path:
fallback_params = params[1:]
else:
fallback_params = params
return sqlite_to_dataframe(sqlite_path, fallback_query, fallback_params)
def save_dataframe_with_fallback(
df: pd.DataFrame,
output_path: str,
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None,
format: str = "parquet"
) -> None:
"""
Save a DataFrame to the specified format, with fallback options if DuckDB extensions fail.
Args:
df: DataFrame to save
output_path: Output file path
duckdb_conn: Optional DuckDB connection (for parquet)
format: Output format ('parquet' or 'csv')
"""
output_path = Path(output_path)
if format.lower() == "parquet":
try:
if duckdb_conn:
# Try using DuckDB to write parquet
duckdb_conn.register('temp_df', df)
duckdb_conn.execute(f"COPY temp_df TO '{output_path}' (FORMAT PARQUET)")
return
except Exception as e:
print(f"DuckDB parquet write failed: {e}, falling back to pandas")
try:
# Fallback to pandas parquet (requires pyarrow)
df.to_parquet(output_path, index=False)
return
except Exception as e:
print(f"Pandas parquet write failed: {e}, falling back to CSV")
# Change extension to CSV and fall through
output_path = output_path.with_suffix('.csv')
format = "csv"
if format.lower() == "csv":
df.to_csv(output_path, index=False)
def read_dataframe_with_fallback(
file_path: str,
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None
) -> pd.DataFrame:
"""
Read a DataFrame from file with fallback options.
Args:
file_path: Path to input file
duckdb_conn: Optional DuckDB connection
Returns:
DataFrame with file contents
"""
file_path = Path(file_path)
if file_path.suffix.lower() == '.parquet':
try:
if duckdb_conn:
# Try using DuckDB to read parquet
return duckdb_conn.execute(f"SELECT * FROM parquet_scan('{file_path}')").df()
except Exception:
pass
try:
# Fallback to pandas
return pd.read_parquet(file_path)
except Exception:
# Try CSV fallback
csv_path = file_path.with_suffix('.csv')
if csv_path.exists():
return pd.read_csv(csv_path)
raise
elif file_path.suffix.lower() == '.csv':
return pd.read_csv(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path.suffix}")

View file

@ -0,0 +1,179 @@
#!/usr/bin/env python3
import sys
import json
import os
import sqlite3
import duckdb
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
def main():
if len(sys.argv) < 2:
print("Usage: extract_podcasts_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
# This job produces a single partition with all podcast metadata
if partition_ref != "podcasts/all":
print(f"Invalid partition ref: {partition_ref}. Expected 'podcasts/all'", file=sys.stderr)
sys.exit(1)
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [],
"env": {
"PARTITION_REF": partition_ref
}
}]
}
print(json.dumps(config))
def handle_exec(args):
partition_ref = os.getenv('PARTITION_REF', 'podcasts/all')
# Database paths
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
if not os.path.exists(db_path):
# Fallback to relative path for development
db_path = "data/ingest/database.sqlite"
# Output path
output_dir = Path("/tmp/databuild_test/examples/podcast_reviews/podcasts")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "podcasts.parquet"
try:
# Extract all podcasts with their categories
extract_podcasts_with_categories(db_path, str(output_file))
print(f"Successfully extracted podcast metadata")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:extract_podcasts_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": partition_ref}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error extracting podcasts: {e}", file=sys.stderr)
sys.exit(1)
def extract_podcasts_with_categories(db_path: str, output_file: str):
"""Extract all podcasts with their categories and save as parquet."""
# Connect to SQLite
sqlite_conn = sqlite3.connect(db_path)
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Query to get podcasts with categories (handling multiple categories per podcast)
query = """
WITH podcast_categories AS (
SELECT
p.podcast_id,
p.itunes_id,
p.slug,
p.itunes_url,
p.title,
c.category,
ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank
FROM sqlite_scan(?, 'podcasts') p
LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id
),
primary_categories AS (
SELECT
podcast_id,
itunes_id,
slug,
itunes_url,
title,
category as primary_category
FROM podcast_categories
WHERE category_rank = 1
),
all_categories AS (
SELECT
podcast_id,
STRING_AGG(category, '|' ORDER BY category) as all_categories
FROM podcast_categories
WHERE category IS NOT NULL
GROUP BY podcast_id
)
SELECT
pc.podcast_id,
pc.itunes_id,
pc.slug,
pc.itunes_url,
pc.title,
pc.primary_category,
COALESCE(ac.all_categories, pc.primary_category) as all_categories
FROM primary_categories pc
LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id
ORDER BY pc.title
"""
# Execute query and save to parquet
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)", [db_path, db_path])
# Get row count for logging
count_result = duckdb_conn.execute(
"SELECT COUNT(*) FROM sqlite_scan(?, 'podcasts')",
[db_path]
).fetchone()
row_count = count_result[0] if count_result else 0
print(f"Extracted {row_count} podcasts with category information")
finally:
sqlite_conn.close()
duckdb_conn.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3
import sys
import json
import os
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Any
import re
from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback
def main():
if len(sys.argv) < 2:
print("Usage: extract_reviews_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'reviews/date=2020-01-01' into components."""
match = re.match(r'reviews/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"date": match.group(1)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
try:
parsed = parse_partition_ref(partition_ref)
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_DATE": date_str
}
}]
}
print(json.dumps(config))
def handle_exec(args):
if len(args) < 1:
print("Exec mode requires date argument", file=sys.stderr)
sys.exit(1)
target_date = args[0]
partition_ref = os.getenv('PARTITION_REF', f'reviews/date={target_date}')
# Database paths
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
if not os.path.exists(db_path):
# Fallback to relative path for development
db_path = "data/ingest/database.sqlite"
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "reviews.parquet"
try:
# Extract reviews for the target date
extract_reviews_for_date(db_path, target_date, str(output_file))
print(f"Successfully extracted reviews for {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:extract_reviews_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error extracting reviews: {e}", file=sys.stderr)
sys.exit(1)
def extract_reviews_for_date(db_path: str, target_date: str, output_file: str):
"""Extract reviews for a specific date and save as parquet."""
# Connect to DuckDB with extension handling
duckdb_conn = create_duckdb_connection()
try:
# Query reviews for the target date
query = """
SELECT
podcast_id,
title as review_title,
content,
rating,
author_id,
created_at,
DATE(created_at) as review_date
FROM sqlite_scan(?, 'reviews')
WHERE DATE(created_at) = ?
ORDER BY created_at
"""
# Execute query with fallback handling
df = execute_query_with_fallback(
duckdb_conn,
db_path,
query,
[db_path, target_date]
)
# Save to parquet with fallback
save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet")
row_count = len(df)
print(f"Extracted {row_count} reviews for date {target_date}")
finally:
duckdb_conn.close()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python3
import sys
import json
import re
from collections import defaultdict
def main():
if len(sys.argv) < 2:
print("Usage: job_lookup.py partition_ref [partition_ref...]", file=sys.stderr)
sys.exit(1)
partition_refs = sys.argv[1:]
try:
result = defaultdict(list)
for partition_ref in partition_refs:
job_label = lookup_job_for_partition(partition_ref)
result[job_label].append(partition_ref)
# Output in the format expected by DataBuild
print(json.dumps({k: v for k, v in result.items() if v}))
except Exception as e:
print(f"Error in job lookup: {e}", file=sys.stderr)
sys.exit(1)
def lookup_job_for_partition(partition_ref: str) -> str:
"""Determine which job produces a given partition reference."""
# Extract reviews by date: reviews/date=YYYY-MM-DD
if re.match(r'reviews/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:extract_reviews_job"
# Extract all podcasts: podcasts/all
if partition_ref == "podcasts/all":
return "//:extract_podcasts_job"
# Categorized reviews: categorized_reviews/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'categorized_reviews/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:categorize_reviews_job"
# Phrase models: phrase_models/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'phrase_models/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:phrase_modeling_job"
# Phrase statistics: phrase_stats/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'phrase_stats/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:phrase_stats_job"
# Daily summaries: daily_summaries/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'daily_summaries/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:daily_summary_job"
# If no match found, raise an error
raise ValueError(f"No job found for partition reference: {partition_ref}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,352 @@
#!/usr/bin/env python3
import sys
import json
import os
import duckdb
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: phrase_modeling_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'phrase_models/category=comedy/date=2020-01-01' into components."""
match = re.match(r'phrase_models/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: categorized reviews for the category and date
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
print(json.dumps(config))
def handle_exec(args):
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'phrase_models/category={target_category}/date={target_date}')
# Input path
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
# Check input file exists
if not os.path.exists(categorized_reviews_file):
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/phrase_models/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "phrase_models.parquet"
try:
# Extract phrases using phrase modeling
extract_phrases_for_category_date(categorized_reviews_file, target_category, target_date, str(output_file))
print(f"Successfully extracted phrases for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:phrase_modeling_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error extracting phrases: {e}", file=sys.stderr)
sys.exit(1)
def extract_phrases_for_category_date(categorized_reviews_file: str, target_category: str, target_date: str, output_file: str):
"""Extract phrases from categorized reviews using phrase binary or simple ngram extraction."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Check if phrase binary is available
phrase_binary = find_phrase_binary()
if phrase_binary:
# Use external phrase binary
phrases = extract_with_phrase_binary(categorized_reviews_file, phrase_binary)
else:
print("Warning: phrase binary not found, using simple ngram extraction")
# Fallback to simple ngram extraction
phrases = extract_simple_ngrams(categorized_reviews_file, duckdb_conn)
# Convert phrases to structured data and save
if phrases:
save_phrases_to_parquet(phrases, target_category, target_date, output_file, duckdb_conn)
else:
# Create empty parquet file with correct schema
create_empty_phrase_parquet(target_category, target_date, output_file, duckdb_conn)
finally:
duckdb_conn.close()
def find_phrase_binary() -> str:
"""Find phrase binary in common locations."""
possible_paths = [
"/usr/local/bin/phrase",
"/usr/bin/phrase",
"./phrase",
"../phrase",
os.path.expanduser("~/bin/phrase")
]
for path in possible_paths:
if os.path.exists(path) and os.access(path, os.X_OK):
return path
return None
def extract_with_phrase_binary(categorized_reviews_file: str, phrase_binary: str) -> List[Dict[str, Any]]:
"""Extract phrases using the external phrase binary."""
# Read review content to temporary file
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Extract review content
content_query = f"SELECT content FROM parquet_scan('{categorized_reviews_file}') WHERE content IS NOT NULL AND content != ''"
results = duckdb_conn.execute(content_query).fetchall()
if not results:
return []
# Write content to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file:
for (content,) in results:
temp_file.write(content.strip() + '\n')
temp_file_path = temp_file.name
try:
# Run phrase binary
cmd = [phrase_binary, "--input", temp_file_path, "--output-format", "json"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
print(f"Phrase binary failed: {result.stderr}", file=sys.stderr)
return []
# Parse JSON output
phrases_data = json.loads(result.stdout) if result.stdout.strip() else []
return phrases_data
finally:
# Clean up temp file
os.unlink(temp_file_path)
finally:
duckdb_conn.close()
def extract_simple_ngrams(categorized_reviews_file: str, duckdb_conn) -> List[Dict[str, Any]]:
"""Simple ngram extraction using SQL as fallback."""
# Simple phrase extraction using SQL
query = f"""
WITH word_tokens AS (
SELECT
unnest(string_split(lower(regexp_replace(content, '[^a-zA-Z0-9\\s]', '', 'g')), ' ')) as word,
podcast_id,
rating
FROM parquet_scan('{categorized_reviews_file}')
WHERE content IS NOT NULL AND content != ''
),
bigrams AS (
SELECT
word || ' ' || lead(word) OVER (PARTITION BY podcast_id ORDER BY rowid) as ngram,
rating
FROM (SELECT *, row_number() OVER () as rowid FROM word_tokens) t
WHERE word IS NOT NULL AND word != ''
),
phrase_stats AS (
SELECT
ngram,
COUNT(*) as frequency,
AVG(rating::FLOAT) as avg_rating,
MIN(rating) as min_rating,
MAX(rating) as max_rating
FROM bigrams
WHERE ngram IS NOT NULL AND ngram NOT LIKE '% %' = false
GROUP BY ngram
HAVING COUNT(*) >= 3 -- Only phrases that appear at least 3 times
)
SELECT
ngram,
frequency,
avg_rating,
min_rating,
max_rating,
CASE
WHEN avg_rating >= 4.0 THEN frequency * avg_rating * 0.8
WHEN avg_rating <= 2.0 THEN frequency * (5.0 - avg_rating) * 0.8
ELSE frequency * 0.3
END as score
FROM phrase_stats
ORDER BY score DESC
LIMIT 1000
"""
try:
results = duckdb_conn.execute(query).fetchall()
phrases = []
for row in results:
ngram, frequency, avg_rating, min_rating, max_rating, score = row
phrases.append({
"ngram": ngram,
"frequency": frequency,
"avg_rating": float(avg_rating) if avg_rating else 0.0,
"min_rating": min_rating,
"max_rating": max_rating,
"score": float(score) if score else 0.0,
"hash": hash(ngram) % (2**31) # Simple hash for compatibility
})
return phrases
except Exception as e:
print(f"Error in simple ngram extraction: {e}", file=sys.stderr)
return []
def save_phrases_to_parquet(phrases: List[Dict[str, Any]], category: str, date: str, output_file: str, duckdb_conn):
"""Save phrases to parquet file."""
if not phrases:
create_empty_phrase_parquet(category, date, output_file, duckdb_conn)
return
# Create temporary table with phrases
duckdb_conn.execute("DROP TABLE IF EXISTS temp_phrases")
duckdb_conn.execute("""
CREATE TABLE temp_phrases (
date VARCHAR,
category VARCHAR,
hash BIGINT,
ngram VARCHAR,
score DOUBLE
)
""")
# Insert phrase data
for phrase in phrases:
duckdb_conn.execute("""
INSERT INTO temp_phrases VALUES (?, ?, ?, ?, ?)
""", [
date,
category,
phrase.get("hash", hash(phrase.get("ngram", "")) % (2**31)),
phrase.get("ngram", ""),
phrase.get("score", 0.0)
])
# Save to parquet
duckdb_conn.execute(f"COPY temp_phrases TO '{output_file}' (FORMAT PARQUET)")
print(f"Saved {len(phrases)} phrases to parquet file")
def create_empty_phrase_parquet(category: str, date: str, output_file: str, duckdb_conn):
"""Create empty parquet file with correct schema."""
duckdb_conn.execute("DROP TABLE IF EXISTS empty_phrases")
duckdb_conn.execute("""
CREATE TABLE empty_phrases (
date VARCHAR,
category VARCHAR,
hash BIGINT,
ngram VARCHAR,
score DOUBLE
)
""")
duckdb_conn.execute(f"COPY empty_phrases TO '{output_file}' (FORMAT PARQUET)")
print("Created empty phrase models file (no phrases extracted)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,262 @@
#!/usr/bin/env python3
import sys
import json
import os
import duckdb
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: phrase_stats_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'phrase_stats/category=comedy/date=2020-01-01' into components."""
match = re.match(r'phrase_stats/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase models and categorized reviews for the category and date
phrase_models_ref = f"phrase_models/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_models_ref}},
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
print(json.dumps(config))
def handle_exec(args):
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'phrase_stats/category={target_category}/date={target_date}')
# Input paths
phrase_models_file = f"/tmp/databuild_test/examples/podcast_reviews/phrase_models/category={target_category}/date={target_date}/phrase_models.parquet"
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
# Check input files exist
if not os.path.exists(phrase_models_file):
print(f"Phrase models file not found: {phrase_models_file}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(categorized_reviews_file):
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/phrase_stats/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "phrase_stats.parquet"
try:
# Calculate phrase statistics per podcast
calculate_phrase_stats_for_category_date(
phrase_models_file,
categorized_reviews_file,
target_category,
target_date,
str(output_file)
)
print(f"Successfully calculated phrase stats for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"phrase_models/category={target_category}/date={target_date}"},
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:phrase_stats_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error calculating phrase stats: {e}", file=sys.stderr)
sys.exit(1)
def calculate_phrase_stats_for_category_date(
phrase_models_file: str,
categorized_reviews_file: str,
target_category: str,
target_date: str,
output_file: str
):
"""Calculate phrase statistics per podcast by joining phrase models with reviews."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Check if we have phrase models
phrase_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{phrase_models_file}')").fetchone()[0]
if phrase_count == 0:
print(f"No phrase models found, creating empty phrase stats")
create_empty_phrase_stats(target_category, target_date, output_file, duckdb_conn)
return
# Query to calculate phrase statistics per podcast
query = f"""
WITH phrase_matches AS (
SELECT
r.podcast_id,
r.podcast_title,
r.rating,
r.content,
p.ngram,
p.score as phrase_score
FROM parquet_scan('{categorized_reviews_file}') r
JOIN parquet_scan('{phrase_models_file}') p
ON lower(r.content) LIKE '%' || lower(p.ngram) || '%'
WHERE r.content IS NOT NULL
AND r.content != ''
AND p.ngram IS NOT NULL
AND p.ngram != ''
),
podcast_phrase_stats AS (
SELECT
'{target_date}' as date,
'{target_category}' as category,
podcast_id,
podcast_title,
ngram,
COUNT(*) as count,
AVG(rating::FLOAT) as avg_rating,
MIN(rating) as min_rating,
MAX(rating) as max_rating,
AVG(phrase_score) as avg_phrase_score,
COUNT(*) * AVG(phrase_score) * AVG(rating::FLOAT) / 5.0 as weighted_score
FROM phrase_matches
GROUP BY podcast_id, podcast_title, ngram
HAVING COUNT(*) >= 2 -- Only include phrases that appear at least twice per podcast
)
SELECT
date,
category,
podcast_id,
podcast_title,
ngram,
count,
avg_rating,
min_rating,
max_rating,
avg_phrase_score,
weighted_score
FROM podcast_phrase_stats
ORDER BY weighted_score DESC, count DESC
"""
# Execute query and save to parquet
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
# Get row count for logging
count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()
row_count = count_result[0] if count_result else 0
print(f"Calculated phrase statistics for {row_count} podcast-phrase combinations")
if row_count == 0:
print(f"Warning: No phrase matches found for category '{target_category}' on date '{target_date}'")
create_empty_phrase_stats(target_category, target_date, output_file, duckdb_conn)
finally:
duckdb_conn.close()
def create_empty_phrase_stats(category: str, date: str, output_file: str, duckdb_conn):
"""Create empty phrase stats parquet file with correct schema."""
duckdb_conn.execute("DROP TABLE IF EXISTS empty_phrase_stats")
duckdb_conn.execute("""
CREATE TABLE empty_phrase_stats (
date VARCHAR,
category VARCHAR,
podcast_id VARCHAR,
podcast_title VARCHAR,
ngram VARCHAR,
count BIGINT,
avg_rating DOUBLE,
min_rating INTEGER,
max_rating INTEGER,
avg_phrase_score DOUBLE,
weighted_score DOUBLE
)
""")
duckdb_conn.execute(f"COPY empty_phrase_stats TO '{output_file}' (FORMAT PARQUET)")
print("Created empty phrase stats file")
if __name__ == "__main__":
main()

View file

@ -1,2 +1,4 @@
duckdb==1.2.2
pydantic==2.11.3
pydantic==2.11.3
pandas>=2.0.0
pyarrow>=10.0.0

View file

@ -61,6 +61,160 @@ duckdb==1.2.2 \
--hash=sha256:fb9a2c77236fae079185a990434cb9d8432902488ba990235c702fc2692d2dcd \
--hash=sha256:fd9c434127fd1575694e1cf19a393bed301f5d6e80b4bcdae80caa368a61a678
# via -r requirements.in
numpy==2.3.1 \
--hash=sha256:0025048b3c1557a20bc80d06fdeb8cc7fc193721484cca82b2cfa072fec71a93 \
--hash=sha256:010ce9b4f00d5c036053ca684c77441f2f2c934fd23bee058b4d6f196efd8280 \
--hash=sha256:0bb3a4a61e1d327e035275d2a993c96fa786e4913aa089843e6a2d9dd205c66a \
--hash=sha256:0c4d9e0a8368db90f93bd192bfa771ace63137c3488d198ee21dfb8e7771916e \
--hash=sha256:15aa4c392ac396e2ad3d0a2680c0f0dee420f9fed14eef09bdb9450ee6dcb7b7 \
--hash=sha256:18703df6c4a4fee55fd3d6e5a253d01c5d33a295409b03fda0c86b3ca2ff41a1 \
--hash=sha256:1ec9ae20a4226da374362cca3c62cd753faf2f951440b0e3b98e93c235441d2b \
--hash=sha256:23ab05b2d241f76cb883ce8b9a93a680752fbfcbd51c50eff0b88b979e471d8c \
--hash=sha256:25a1992b0a3fdcdaec9f552ef10d8103186f5397ab45e2d25f8ac51b1a6b97e8 \
--hash=sha256:2959d8f268f3d8ee402b04a9ec4bb7604555aeacf78b360dc4ec27f1d508177d \
--hash=sha256:2a809637460e88a113e186e87f228d74ae2852a2e0c44de275263376f17b5bdc \
--hash=sha256:2fb86b7e58f9ac50e1e9dd1290154107e47d1eef23a0ae9145ded06ea606f992 \
--hash=sha256:36890eb9e9d2081137bd78d29050ba63b8dab95dff7912eadf1185e80074b2a0 \
--hash=sha256:39bff12c076812595c3a306f22bfe49919c5513aa1e0e70fac756a0be7c2a2b8 \
--hash=sha256:467db865b392168ceb1ef1ffa6f5a86e62468c43e0cfb4ab6da667ede10e58db \
--hash=sha256:4e602e1b8682c2b833af89ba641ad4176053aaa50f5cacda1a27004352dde943 \
--hash=sha256:5902660491bd7a48b2ec16c23ccb9124b8abfd9583c5fdfa123fe6b421e03de1 \
--hash=sha256:5ccb7336eaf0e77c1635b232c141846493a588ec9ea777a7c24d7166bb8533ae \
--hash=sha256:5f1b8f26d1086835f442286c1d9b64bb3974b0b1e41bb105358fd07d20872952 \
--hash=sha256:6269b9edfe32912584ec496d91b00b6d34282ca1d07eb10e82dfc780907d6c2e \
--hash=sha256:6ea9e48336a402551f52cd8f593343699003d2353daa4b72ce8d34f66b722070 \
--hash=sha256:762e0c0c6b56bdedfef9a8e1d4538556438288c4276901ea008ae44091954e29 \
--hash=sha256:7be91b2239af2658653c5bb6f1b8bccafaf08226a258caf78ce44710a0160d30 \
--hash=sha256:7dea630156d39b02a63c18f508f85010230409db5b2927ba59c8ba4ab3e8272e \
--hash=sha256:867ef172a0976aaa1f1d1b63cf2090de8b636a7674607d514505fb7276ab08fc \
--hash=sha256:8d5ee6eec45f08ce507a6570e06f2f879b374a552087a4179ea7838edbcbfa42 \
--hash=sha256:8e333040d069eba1652fb08962ec5b76af7f2c7bce1df7e1418c8055cf776f25 \
--hash=sha256:a5ee121b60aa509679b682819c602579e1df14a5b07fe95671c8849aad8f2115 \
--hash=sha256:a780033466159c2270531e2b8ac063704592a0bc62ec4a1b991c7c40705eb0e8 \
--hash=sha256:a894f3816eb17b29e4783e5873f92faf55b710c2519e5c351767c51f79d8526d \
--hash=sha256:a8b740f5579ae4585831b3cf0e3b0425c667274f82a484866d2adf9570539369 \
--hash=sha256:ad506d4b09e684394c42c966ec1527f6ebc25da7f4da4b1b056606ffe446b8a3 \
--hash=sha256:afed2ce4a84f6b0fc6c1ce734ff368cbf5a5e24e8954a338f3bdffa0718adffb \
--hash=sha256:b0b5397374f32ec0649dd98c652a1798192042e715df918c20672c62fb52d4b8 \
--hash=sha256:bada6058dd886061f10ea15f230ccf7dfff40572e99fef440a4a857c8728c9c0 \
--hash=sha256:c4913079974eeb5c16ccfd2b1f09354b8fed7e0d6f2cab933104a09a6419b1ee \
--hash=sha256:c5bdf2015ccfcee8253fb8be695516ac4457c743473a43290fd36eba6a1777eb \
--hash=sha256:c6e0bf9d1a2f50d2b65a7cf56db37c095af17b59f6c132396f7c6d5dd76484df \
--hash=sha256:ce2ce9e5de4703a673e705183f64fd5da5bf36e7beddcb63a25ee2286e71ca48 \
--hash=sha256:cfecc7822543abdea6de08758091da655ea2210b8ffa1faf116b940693d3df76 \
--hash=sha256:d4580adadc53311b163444f877e0789f1c8861e2698f6b2a4ca852fda154f3ff \
--hash=sha256:d70f20df7f08b90a2062c1f07737dd340adccf2068d0f1b9b3d56e2038979fee \
--hash=sha256:e344eb79dab01f1e838ebb67aab09965fb271d6da6b00adda26328ac27d4a66e \
--hash=sha256:e610832418a2bc09d974cc9fecebfa51e9532d6190223bc5ef6a7402ebf3b5cb \
--hash=sha256:e772dda20a6002ef7061713dc1e2585bc1b534e7909b2030b5a46dae8ff077ab \
--hash=sha256:e7cbf5a5eafd8d230a3ce356d892512185230e4781a361229bd902ff403bc660 \
--hash=sha256:eabd7e8740d494ce2b4ea0ff05afa1b7b291e978c0ae075487c51e8bd93c0c68 \
--hash=sha256:ebb8603d45bc86bbd5edb0d63e52c5fd9e7945d3a503b77e486bd88dde67a19b \
--hash=sha256:ec0bdafa906f95adc9a0c6f26a4871fa753f25caaa0e032578a30457bff0af6a \
--hash=sha256:eccb9a159db9aed60800187bc47a6d3451553f0e1b08b068d8b277ddfbb9b244 \
--hash=sha256:ee8340cb48c9b7a5899d1149eece41ca535513a9698098edbade2a8e7a84da77
# via pandas
pandas==2.3.0 \
--hash=sha256:034abd6f3db8b9880aaee98f4f5d4dbec7c4829938463ec046517220b2f8574e \
--hash=sha256:094e271a15b579650ebf4c5155c05dcd2a14fd4fdd72cf4854b2f7ad31ea30be \
--hash=sha256:14a0cc77b0f089d2d2ffe3007db58f170dae9b9f54e569b299db871a3ab5bf46 \
--hash=sha256:1a881bc1309f3fce34696d07b00f13335c41f5f5a8770a33b09ebe23261cfc67 \
--hash=sha256:1d2b33e68d0ce64e26a4acc2e72d747292084f4e8db4c847c6f5f6cbe56ed6d8 \
--hash=sha256:213cd63c43263dbb522c1f8a7c9d072e25900f6975596f883f4bebd77295d4f3 \
--hash=sha256:23c2b2dc5213810208ca0b80b8666670eb4660bbfd9d45f58592cc4ddcfd62e1 \
--hash=sha256:2c7e2fc25f89a49a11599ec1e76821322439d90820108309bf42130d2f36c983 \
--hash=sha256:2eb4728a18dcd2908c7fccf74a982e241b467d178724545a48d0caf534b38ebf \
--hash=sha256:34600ab34ebf1131a7613a260a61dbe8b62c188ec0ea4c296da7c9a06b004133 \
--hash=sha256:39ff73ec07be5e90330cc6ff5705c651ace83374189dcdcb46e6ff54b4a72cd6 \
--hash=sha256:404d681c698e3c8a40a61d0cd9412cc7364ab9a9cc6e144ae2992e11a2e77a20 \
--hash=sha256:40cecc4ea5abd2921682b57532baea5588cc5f80f0231c624056b146887274d2 \
--hash=sha256:430a63bae10b5086995db1b02694996336e5a8ac9a96b4200572b413dfdfccb9 \
--hash=sha256:4930255e28ff5545e2ca404637bcc56f031893142773b3468dc021c6c32a1390 \
--hash=sha256:6021910b086b3ca756755e86ddc64e0ddafd5e58e076c72cb1585162e5ad259b \
--hash=sha256:625466edd01d43b75b1883a64d859168e4556261a5035b32f9d743b67ef44634 \
--hash=sha256:75651c14fde635e680496148a8526b328e09fe0572d9ae9b638648c46a544ba3 \
--hash=sha256:84141f722d45d0c2a89544dd29d35b3abfc13d2250ed7e68394eda7564bd6324 \
--hash=sha256:8adff9f138fc614347ff33812046787f7d43b3cef7c0f0171b3340cae333f6ca \
--hash=sha256:951805d146922aed8357e4cc5671b8b0b9be1027f0619cea132a9f3f65f2f09c \
--hash=sha256:9efc0acbbffb5236fbdf0409c04edce96bec4bdaa649d49985427bd1ec73e085 \
--hash=sha256:9ff730713d4c4f2f1c860e36c005c7cefc1c7c80c21c0688fd605aa43c9fcf09 \
--hash=sha256:a6872d695c896f00df46b71648eea332279ef4077a409e2fe94220208b6bb675 \
--hash=sha256:b198687ca9c8529662213538a9bb1e60fa0bf0f6af89292eb68fea28743fcd5a \
--hash=sha256:b9d8c3187be7479ea5c3d30c32a5d73d62a621166675063b2edd21bc47614027 \
--hash=sha256:ba24af48643b12ffe49b27065d3babd52702d95ab70f50e1b34f71ca703e2c0d \
--hash=sha256:bb32dc743b52467d488e7a7c8039b821da2826a9ba4f85b89ea95274f863280f \
--hash=sha256:bb3be958022198531eb7ec2008cfc78c5b1eed51af8600c6c5d9160d89d8d249 \
--hash=sha256:bf5be867a0541a9fb47a4be0c5790a4bccd5b77b92f0a59eeec9375fafc2aa14 \
--hash=sha256:c06f6f144ad0a1bf84699aeea7eff6068ca5c63ceb404798198af7eb86082e33 \
--hash=sha256:c6da97aeb6a6d233fb6b17986234cc723b396b50a3c6804776351994f2a658fd \
--hash=sha256:e0f51973ba93a9f97185049326d75b942b9aeb472bec616a129806facb129ebb \
--hash=sha256:e1991bbb96f4050b09b5f811253c4f3cf05ee89a589379aa36cd623f21a31d6f \
--hash=sha256:e5f08eb9a445d07720776df6e641975665c9ea12c9d8a331e0f6890f2dcd76ef \
--hash=sha256:e78ad363ddb873a631e92a3c063ade1ecfb34cae71e9a2be6ad100f875ac1042 \
--hash=sha256:ed16339bc354a73e0a609df36d256672c7d296f3f767ac07257801aa064ff73c \
--hash=sha256:f4dd97c19bd06bc557ad787a15b6489d2614ddaab5d104a0310eb314c724b2d2 \
--hash=sha256:f925f1ef673b4bd0271b1809b72b3270384f2b7d9d14a189b12b7fc02574d575 \
--hash=sha256:f95a2aef32614ed86216d3c450ab12a4e82084e8102e355707a1d96e33d51c34 \
--hash=sha256:fa07e138b3f6c04addfeaf56cc7fdb96c3b68a3fe5e5401251f231fce40a0d7a \
--hash=sha256:fa35c266c8cd1a67d75971a1912b185b492d257092bdd2709bbdebe574ed228d
# via -r requirements.in
pyarrow==20.0.0 \
--hash=sha256:00138f79ee1b5aca81e2bdedb91e3739b987245e11fa3c826f9e57c5d102fb75 \
--hash=sha256:11529a2283cb1f6271d7c23e4a8f9f8b7fd173f7360776b668e509d712a02eec \
--hash=sha256:15aa1b3b2587e74328a730457068dc6c89e6dcbf438d4369f572af9d320a25ee \
--hash=sha256:1bcbe471ef3349be7714261dea28fe280db574f9d0f77eeccc195a2d161fd861 \
--hash=sha256:204a846dca751428991346976b914d6d2a82ae5b8316a6ed99789ebf976551e6 \
--hash=sha256:211d5e84cecc640c7a3ab900f930aaff5cd2702177e0d562d426fb7c4f737781 \
--hash=sha256:24ca380585444cb2a31324c546a9a56abbe87e26069189e14bdba19c86c049f0 \
--hash=sha256:2c3a01f313ffe27ac4126f4c2e5ea0f36a5fc6ab51f8726cf41fee4b256680bd \
--hash=sha256:30b3051b7975801c1e1d387e17c588d8ab05ced9b1e14eec57915f79869b5031 \
--hash=sha256:3346babb516f4b6fd790da99b98bed9708e3f02e734c84971faccb20736848dc \
--hash=sha256:3e1f8a47f4b4ae4c69c4d702cfbdfe4d41e18e5c7ef6f1bb1c50918c1e81c57b \
--hash=sha256:4250e28a22302ce8692d3a0e8ec9d9dde54ec00d237cff4dfa9c1fbf79e472a8 \
--hash=sha256:4680f01ecd86e0dd63e39eb5cd59ef9ff24a9d166db328679e36c108dc993d4c \
--hash=sha256:4a8b029a07956b8d7bd742ffca25374dd3f634b35e46cc7a7c3fa4c75b297191 \
--hash=sha256:4ba3cf4182828be7a896cbd232aa8dd6a31bd1f9e32776cc3796c012855e1199 \
--hash=sha256:5605919fbe67a7948c1f03b9f3727d82846c053cd2ce9303ace791855923fd20 \
--hash=sha256:5f0fb1041267e9968c6d0d2ce3ff92e3928b243e2b6d11eeb84d9ac547308232 \
--hash=sha256:6102b4864d77102dbbb72965618e204e550135a940c2534711d5ffa787df2a5a \
--hash=sha256:6415a0d0174487456ddc9beaead703d0ded5966129fa4fd3114d76b5d1c5ceae \
--hash=sha256:6bb830757103a6cb300a04610e08d9636f0cd223d32f388418ea893a3e655f1c \
--hash=sha256:6fc1499ed3b4b57ee4e090e1cea6eb3584793fe3d1b4297bbf53f09b434991a5 \
--hash=sha256:75a51a5b0eef32727a247707d4755322cb970be7e935172b6a3a9f9ae98404ba \
--hash=sha256:7a3a5dcf54286e6141d5114522cf31dd67a9e7c9133d150799f30ee302a7a1ab \
--hash=sha256:7f4c8534e2ff059765647aa69b75d6543f9fef59e2cd4c6d18015192565d2b70 \
--hash=sha256:82f1ee5133bd8f49d31be1299dc07f585136679666b502540db854968576faf9 \
--hash=sha256:851c6a8260ad387caf82d2bbf54759130534723e37083111d4ed481cb253cc0d \
--hash=sha256:89e030dc58fc760e4010148e6ff164d2f44441490280ef1e97a542375e41058e \
--hash=sha256:95b330059ddfdc591a3225f2d272123be26c8fa76e8c9ee1a77aad507361cfdb \
--hash=sha256:96d6a0a37d9c98be08f5ed6a10831d88d52cac7b13f5287f1e0f625a0de8062b \
--hash=sha256:96e37f0766ecb4514a899d9a3554fadda770fb57ddf42b63d80f14bc20aa7db3 \
--hash=sha256:97c8dc984ed09cb07d618d57d8d4b67a5100a30c3818c2fb0b04599f0da2de7b \
--hash=sha256:991f85b48a8a5e839b2128590ce07611fae48a904cae6cab1f089c5955b57eb5 \
--hash=sha256:9965a050048ab02409fb7cbbefeedba04d3d67f2cc899eff505cc084345959ca \
--hash=sha256:9b71daf534f4745818f96c214dbc1e6124d7daf059167330b610fc69b6f3d3e3 \
--hash=sha256:a15532e77b94c61efadde86d10957950392999503b3616b2ffcef7621a002893 \
--hash=sha256:a18a14baef7d7ae49247e75641fd8bcbb39f44ed49a9fc4ec2f65d5031aa3b96 \
--hash=sha256:a1f60dc14658efaa927f8214734f6a01a806d7690be4b3232ba526836d216122 \
--hash=sha256:a2791f69ad72addd33510fec7bb14ee06c2a448e06b649e264c094c5b5f7ce28 \
--hash=sha256:a5704f29a74b81673d266e5ec1fe376f060627c2e42c5c7651288ed4b0db29e9 \
--hash=sha256:a6ad3e7758ecf559900261a4df985662df54fb7fdb55e8e3b3aa99b23d526b62 \
--hash=sha256:aa0d288143a8585806e3cc7c39566407aab646fb9ece164609dac1cfff45f6ae \
--hash=sha256:b6953f0114f8d6f3d905d98e987d0924dabce59c3cda380bdfaa25a6201563b4 \
--hash=sha256:b8ff87cc837601532cc8242d2f7e09b4e02404de1b797aee747dd4ba4bd6313f \
--hash=sha256:c7dd06fd7d7b410ca5dc839cc9d485d2bc4ae5240851bcd45d85105cc90a47d7 \
--hash=sha256:ca151afa4f9b7bc45bcc791eb9a89e90a9eb2772767d0b1e5389609c7d03db63 \
--hash=sha256:cb497649e505dc36542d0e68eca1a3c94ecbe9799cb67b578b55f2441a247fbc \
--hash=sha256:d5382de8dc34c943249b01c19110783d0d64b207167c728461add1ecc2db88e4 \
--hash=sha256:db53390eaf8a4dab4dbd6d93c85c5cf002db24902dbff0ca7d988beb5c9dd15b \
--hash=sha256:dd43f58037443af715f34f1322c782ec463a3c8a94a85fdb2d987ceb5658e061 \
--hash=sha256:e22f80b97a271f0a7d9cd07394a7d348f80d3ac63ed7cc38b6d1b696ab3b2619 \
--hash=sha256:e724a3fd23ae5b9c010e7be857f4405ed5e679db5c93e66204db1a69f733936a \
--hash=sha256:e8b88758f9303fa5a83d6c90e176714b2fd3852e776fc2d7e42a22dd6c2fb368 \
--hash=sha256:f2d67ac28f57a362f1a2c1e6fa98bfe2f03230f7e15927aecd067433b1e70ce8 \
--hash=sha256:f3b117b922af5e4c6b9a9115825726cac7d8b1421c37c2b5e24fbacc8930612c \
--hash=sha256:febc4a913592573c8d5805091a6c2b5064c8bd6e002131f01061797d91c783c1
# via -r requirements.in
pydantic==2.11.3 \
--hash=sha256:7471657138c16adad9322fe3070c0116dd6c3ad8d649300e3cbdfe91f4db4ec3 \
--hash=sha256:a082753436a07f9ba1289c6ffa01cd93db3548776088aa917cc43b63f68fa60f
@ -166,6 +320,18 @@ pydantic-core==2.33.1 \
--hash=sha256:fc903512177361e868bc1f5b80ac8c8a6e05fcdd574a5fb5ffeac5a9982b9e89 \
--hash=sha256:fe44d56aa0b00d66640aa84a3cbe80b7a3ccdc6f0b1ca71090696a6d4777c091
# via pydantic
python-dateutil==2.9.0.post0 \
--hash=sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3 \
--hash=sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427
# via pandas
pytz==2025.2 \
--hash=sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3 \
--hash=sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00
# via pandas
six==1.17.0 \
--hash=sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274 \
--hash=sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81
# via python-dateutil
typing-extensions==4.13.2 \
--hash=sha256:a439e7c04b49fec3e5d3e2beaa21755cadbbdc391694e28ccdd36ca4a1408f8c \
--hash=sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef
@ -177,3 +343,7 @@ typing-inspection==0.4.0 \
--hash=sha256:50e72559fcd2a6367a19f7a7e610e6afcb9fac940c650290eed893d61386832f \
--hash=sha256:9765c87de36671694a67904bf2c96e395be9c6439bb6c87b5142569dcdd65122
# via pydantic
tzdata==2025.2 \
--hash=sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8 \
--hash=sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9
# via pandas

View file

@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
Basic tests for podcast reviews jobs.
Tests the configuration and basic execution flow of each job.
"""
import os
import sys
import subprocess
import tempfile
import shutil
import json
from pathlib import Path
def run_job_config(job_script: str, partition_ref: str):
"""Run a job in config mode and return the parsed config."""
cmd = [sys.executable, job_script, "config", partition_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Config failed for {job_script}: {result.stderr}")
return json.loads(result.stdout)
def test_extract_reviews_job():
"""Test extract_reviews_job configuration."""
print("Testing extract_reviews_job...")
config = run_job_config("extract_reviews_job.py", "reviews/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "reviews/date=2020-01-01"}]
assert job_config["inputs"] == []
assert job_config["args"] == ["2020-01-01"]
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ extract_reviews_job config test passed")
def test_extract_podcasts_job():
"""Test extract_podcasts_job configuration."""
print("Testing extract_podcasts_job...")
config = run_job_config("extract_podcasts_job.py", "podcasts/all")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "podcasts/all"}]
assert job_config["inputs"] == []
assert job_config["args"] == []
assert job_config["env"]["PARTITION_REF"] == "podcasts/all"
print("✓ extract_podcasts_job config test passed")
def test_categorize_reviews_job():
"""Test categorize_reviews_job configuration."""
print("Testing categorize_reviews_job...")
config = run_job_config("categorize_reviews_job.py", "categorized_reviews/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "categorized_reviews/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "reviews/date=2020-01-01" in input_refs
assert "podcasts/all" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ categorize_reviews_job config test passed")
def test_phrase_modeling_job():
"""Test phrase_modeling_job configuration."""
print("Testing phrase_modeling_job...")
config = run_job_config("phrase_modeling_job.py", "phrase_models/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "phrase_models/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 1
assert job_config["inputs"][0]["partition_ref"]["str"] == "categorized_reviews/category=comedy/date=2020-01-01"
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ phrase_modeling_job config test passed")
def test_phrase_stats_job():
"""Test phrase_stats_job configuration."""
print("Testing phrase_stats_job...")
config = run_job_config("phrase_stats_job.py", "phrase_stats/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "phrase_stats/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "phrase_models/category=comedy/date=2020-01-01" in input_refs
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ phrase_stats_job config test passed")
def test_daily_summary_job():
"""Test daily_summary_job configuration."""
print("Testing daily_summary_job...")
config = run_job_config("daily_summary_job.py", "daily_summaries/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "daily_summaries/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "phrase_stats/category=comedy/date=2020-01-01" in input_refs
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ daily_summary_job config test passed")
def test_job_lookup():
"""Test job_lookup functionality."""
print("Testing job_lookup...")
test_cases = [
("reviews/date=2020-01-01", ":extract_reviews_job"),
("podcasts/all", ":extract_podcasts_job"),
("categorized_reviews/category=comedy/date=2020-01-01", ":categorize_reviews_job"),
("phrase_models/category=comedy/date=2020-01-01", ":phrase_modeling_job"),
("phrase_stats/category=comedy/date=2020-01-01", ":phrase_stats_job"),
("daily_summaries/category=comedy/date=2020-01-01", ":daily_summary_job"),
]
for partition_ref, expected_job_label in test_cases:
cmd = [sys.executable, "job_lookup.py", partition_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Job lookup failed for {partition_ref}: {result.stderr}")
response = json.loads(result.stdout)
# New format: {job_label: [partition_refs]}
assert expected_job_label in response
assert partition_ref in response[expected_job_label]
print("✓ job_lookup test passed")
def test_invalid_partition_refs():
"""Test that invalid partition refs are handled properly."""
print("Testing invalid partition refs...")
invalid_refs = [
"invalid/ref",
"reviews/date=invalid-date",
"categorized_reviews/category=/date=2020-01-01", # missing category
"unknown/partition=ref",
]
for invalid_ref in invalid_refs:
cmd = [sys.executable, "job_lookup.py", invalid_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
# Should fail for invalid refs
assert result.returncode != 0, f"Expected failure for invalid ref: {invalid_ref}"
print("✓ invalid partition refs test passed")
def main():
"""Run all tests."""
print("Running podcast reviews job tests...")
print("=" * 50)
try:
test_extract_reviews_job()
test_extract_podcasts_job()
test_categorize_reviews_job()
test_phrase_modeling_job()
test_phrase_stats_job()
test_daily_summary_job()
test_job_lookup()
test_invalid_partition_refs()
print("=" * 50)
print("All tests passed! ✅")
except Exception as e:
print(f"Test failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()