diff --git a/CLAUDE.md b/CLAUDE.md index b92e370..a5ab040 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,4 +25,65 @@ DataBuild is a bazel-based data build system. Key files: ## Key Components - Graph analysis/execution in Rust - Bazel rules for job orchestration -- Java/Python examples for different use cases \ No newline at end of file +- Java/Python examples for different use cases + +## DataBuild Job Architecture + +### Job Target Structure +Each DataBuild job creates three Bazel targets: +- `job_name.cfg` - Configuration target (calls binary with "config" subcommand) +- `job_name.exec` - Execution target (calls binary with "exec" subcommand) +- `job_name` - Main job target (pipes config output to exec input) + +### Unified Job Binary Pattern +Jobs use a single binary with subcommands: +```python +def main(): + command = sys.argv[1] # "config" or "exec" + if command == "config": + handle_config(sys.argv[2:]) # Output job configuration JSON + elif command == "exec": + handle_exec(sys.argv[2:]) # Perform actual work +``` + +### Job Configuration Requirements +**CRITICAL**: Job configs must include non-empty `args` for execution to work: +```python +config = { + "configs": [{ + "outputs": [{"str": partition_ref}], + "inputs": [...], + "args": ["some_arg"], # REQUIRED: Cannot be empty [] + "env": {"PARTITION_REF": partition_ref} + }] +} +``` + +Jobs with `"args": []` will only have their config function called during execution, not exec. + +### DataBuild Execution Flow +1. **Planning Phase**: DataBuild calls `.cfg` targets to get job configurations +2. **Execution Phase**: DataBuild calls main job targets which pipe config to exec +3. **Job Resolution**: Job lookup returns base job names (e.g., `//:job_name`), not `.cfg` variants + +### Graph Configuration +```python +databuild_graph( + name = "my_graph", + jobs = [":job1", ":job2"], # Reference base job targets + lookup = ":job_lookup", # Binary that routes partition refs to jobs +) +``` + +### Job Lookup Pattern +```python +def lookup_job_for_partition(partition_ref: str) -> str: + if pattern.match(partition_ref): + return "//:job_name" # Return base job target + raise ValueError(f"No job found for: {partition_ref}") +``` + +### Common Pitfalls +- **Empty args**: Jobs with `"args": []` won't execute properly +- **Wrong target refs**: Job lookup must return base targets, not `.cfg` variants +- **Missing partition refs**: All outputs must be addressable via partition references \ No newline at end of file diff --git a/examples/podcast_reviews/categorize_reviews_job.py b/examples/podcast_reviews/categorize_reviews_job.py index aa6af18..f1f9beb 100644 --- a/examples/podcast_reviews/categorize_reviews_job.py +++ b/examples/podcast_reviews/categorize_reviews_job.py @@ -3,11 +3,11 @@ import sys import json import os -import duckdb from datetime import datetime from pathlib import Path from typing import List, Dict, Any import re +from duckdb_utils import create_duckdb_connection, read_dataframe_with_fallback, save_dataframe_with_fallback def main(): if len(sys.argv) < 2: @@ -77,17 +77,33 @@ def handle_exec(args): target_date = args[1] partition_ref = os.getenv('PARTITION_REF', f'categorized_reviews/category={target_category}/date={target_date}') - # Input paths - reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}/reviews.parquet" - podcasts_file = "/tmp/databuild_test/examples/podcast_reviews/podcasts/podcasts.parquet" + # Input paths - check for both parquet and CSV fallbacks + reviews_base = f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}/reviews" + podcasts_base = "/tmp/databuild_test/examples/podcast_reviews/podcasts/podcasts" - # Check input files exist - if not os.path.exists(reviews_file): - print(f"Reviews file not found: {reviews_file}", file=sys.stderr) + reviews_file = None + podcasts_file = None + + # Find reviews file (parquet or CSV) + for ext in ['.parquet', '.csv']: + candidate = reviews_base + ext + if os.path.exists(candidate): + reviews_file = candidate + break + + # Find podcasts file (parquet or CSV) + for ext in ['.parquet', '.csv']: + candidate = podcasts_base + ext + if os.path.exists(candidate): + podcasts_file = candidate + break + + if not reviews_file: + print(f"Reviews file not found: {reviews_base}.(parquet|csv)", file=sys.stderr) sys.exit(1) - if not os.path.exists(podcasts_file): - print(f"Podcasts file not found: {podcasts_file}", file=sys.stderr) + if not podcasts_file: + print(f"Podcasts file not found: {podcasts_base}.(parquet|csv)", file=sys.stderr) sys.exit(1) # Output path @@ -136,50 +152,47 @@ def handle_exec(args): def categorize_reviews_for_category_date(reviews_file: str, podcasts_file: str, target_category: str, output_file: str): """Join reviews with podcast categories and filter for target category.""" - # Connect to DuckDB for processing - duckdb_conn = duckdb.connect() + # Connect to DuckDB with extension handling + duckdb_conn = create_duckdb_connection() try: - # Try to install and load parquet extension, but don't fail if it's already installed - try: - duckdb_conn.execute("INSTALL parquet") - except Exception: - pass # Extension might already be installed + # Read input files with fallback handling + reviews_df = read_dataframe_with_fallback(reviews_file, duckdb_conn) + podcasts_df = read_dataframe_with_fallback(podcasts_file, duckdb_conn) - duckdb_conn.execute("LOAD parquet") + # Perform join and filtering in pandas + import pandas as pd - # Query to join reviews with podcasts and filter by category - query = f""" - SELECT - r.podcast_id, - r.review_title, - r.content, - r.rating, - r.author_id, - r.created_at, - r.review_date, - p.title as podcast_title, - p.primary_category, - p.all_categories, - '{target_category}' as target_category - FROM parquet_scan('{reviews_file}') r - JOIN parquet_scan('{podcasts_file}') p ON r.podcast_id = p.podcast_id - WHERE p.primary_category = '{target_category}' - OR p.all_categories LIKE '%{target_category}%' - ORDER BY r.created_at - """ + # Join reviews with podcasts + joined_df = reviews_df.merge(podcasts_df, on='podcast_id', how='inner') - # Execute query and save to parquet - duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)") + # Filter by category + filtered_df = joined_df[ + (joined_df['primary_category'] == target_category) | + (joined_df['all_categories'].str.contains(target_category, na=False)) + ].copy() - # Get row count for logging - count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone() + # Add target category column + filtered_df['target_category'] = target_category - row_count = count_result[0] if count_result else 0 + # Select and rename columns to match expected output + result_df = filtered_df[[ + 'podcast_id', 'review_title', 'content', 'rating', 'author_id', + 'created_at', 'review_date', 'title', 'primary_category', + 'all_categories', 'target_category' + ]].rename(columns={'title': 'podcast_title'}) + + # Sort by created_at + result_df = result_df.sort_values('created_at') + + # Save to parquet with fallback + save_dataframe_with_fallback(result_df, output_file, duckdb_conn, "parquet") + + row_count = len(result_df) print(f"Categorized {row_count} reviews for category '{target_category}'") if row_count == 0: - print(f"Warning: No reviews found for category '{target_category}' on date '{reviews_file.split('date=')[1].split('/')[0]}'") + print(f"Warning: No reviews found for category '{target_category}'") finally: duckdb_conn.close() diff --git a/examples/podcast_reviews/extract_podcasts_job.py b/examples/podcast_reviews/extract_podcasts_job.py index 858ea19..bdd66bf 100644 --- a/examples/podcast_reviews/extract_podcasts_job.py +++ b/examples/podcast_reviews/extract_podcasts_job.py @@ -3,19 +3,34 @@ import sys import json import os -import sqlite3 -import duckdb from datetime import datetime from pathlib import Path from typing import List, Dict, Any +from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback def main(): + # Write debug at the very start to see if main is called + debug_file = "/tmp/databuild_test/podcasts_main_debug.log" + try: + with open(debug_file, "w") as f: + f.write(f"main() called with sys.argv: {sys.argv}\n") + f.flush() + except: + pass + if len(sys.argv) < 2: print("Usage: extract_podcasts_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] + try: + with open(debug_file, "a") as f: + f.write(f"command: {command}\n") + f.flush() + except: + pass + if command == "config": handle_config(sys.argv[2:]) elif command == "exec": @@ -40,7 +55,7 @@ def handle_config(args): "configs": [{ "outputs": [{"str": partition_ref}], "inputs": [], - "args": [], + "args": ["all"], "env": { "PARTITION_REF": partition_ref } @@ -50,7 +65,19 @@ def handle_config(args): print(json.dumps(config)) def handle_exec(args): + # Write debug info to a file since stdout might not be captured + debug_file = "/tmp/databuild_test/podcasts_debug.log" + with open(debug_file, "w") as f: + f.write(f"Starting extract_podcasts_job.exec with args: {args}\n") + f.flush() + + print(f"Starting extract_podcasts_job.exec with args: {args}") partition_ref = os.getenv('PARTITION_REF', 'podcasts/all') + print(f"Partition ref: {partition_ref}") + + with open(debug_file, "a") as f: + f.write(f"Partition ref: {partition_ref}\n") + f.flush() # Database paths db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite" @@ -58,17 +85,26 @@ def handle_exec(args): # Fallback to relative path for development db_path = "data/ingest/database.sqlite" + print(f"Looking for database at: {db_path}") + print(f"Database exists: {os.path.exists(db_path)}") + # Output path output_dir = Path("/tmp/databuild_test/examples/podcast_reviews/podcasts") output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "podcasts.parquet" + print(f"Output directory: {output_dir}") + print(f"Output file: {output_file}") + try: # Extract all podcasts with their categories - extract_podcasts_with_categories(db_path, str(output_file)) + print("Calling extract_podcasts_with_categories...") + result = extract_podcasts_with_categories(db_path, str(output_file)) + print(f"extract_podcasts_with_categories returned: {result}") print(f"Successfully extracted podcast metadata") print(f"Output written to: {output_file}") + print(f"Output file exists: {output_file.exists()}") # Create manifest manifest = { @@ -90,89 +126,120 @@ def handle_exec(args): manifest_file = output_dir / "manifest.json" with open(manifest_file, 'w') as f: json.dump(manifest, f, indent=2) + + print(f"Manifest written to: {manifest_file}") except Exception as e: print(f"Error extracting podcasts: {e}", file=sys.stderr) + import traceback + traceback.print_exc() sys.exit(1) def extract_podcasts_with_categories(db_path: str, output_file: str): """Extract all podcasts with their categories and save as parquet.""" - # Connect to SQLite - sqlite_conn = sqlite3.connect(db_path) + print(f"extract_podcasts_with_categories called with db_path={db_path}, output_file={output_file}") - # Connect to DuckDB for processing - duckdb_conn = duckdb.connect() + # Connect to DuckDB with extension handling + print("Creating DuckDB connection...") + duckdb_conn = create_duckdb_connection() + print("DuckDB connection created") try: - # Try to install and load parquet extension, but don't fail if it's already installed + # Use a simpler approach that works with SQLite fallback try: - duckdb_conn.execute("INSTALL parquet") - except Exception: - pass # Extension might already be installed - - duckdb_conn.execute("LOAD parquet") - - # Query to get podcasts with categories (handling multiple categories per podcast) - query = """ - WITH podcast_categories AS ( + # Try complex DuckDB query first + query = """ + WITH podcast_categories AS ( + SELECT + p.podcast_id, + p.itunes_id, + p.slug, + p.itunes_url, + p.title, + c.category, + ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank + FROM sqlite_scan(?, 'podcasts') p + LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id + ), + primary_categories AS ( + SELECT + podcast_id, + itunes_id, + slug, + itunes_url, + title, + category as primary_category + FROM podcast_categories + WHERE category_rank = 1 + ), + all_categories AS ( + SELECT + podcast_id, + STRING_AGG(category, '|' ORDER BY category) as all_categories + FROM podcast_categories + WHERE category IS NOT NULL + GROUP BY podcast_id + ) SELECT - p.podcast_id, - p.itunes_id, - p.slug, - p.itunes_url, - p.title, - c.category, - ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank - FROM sqlite_scan(?, 'podcasts') p - LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id - ), - primary_categories AS ( - SELECT - podcast_id, - itunes_id, - slug, - itunes_url, - title, - category as primary_category - FROM podcast_categories - WHERE category_rank = 1 - ), - all_categories AS ( - SELECT - podcast_id, - STRING_AGG(category, '|' ORDER BY category) as all_categories - FROM podcast_categories - WHERE category IS NOT NULL - GROUP BY podcast_id - ) - SELECT - pc.podcast_id, - pc.itunes_id, - pc.slug, - pc.itunes_url, - pc.title, - pc.primary_category, - COALESCE(ac.all_categories, pc.primary_category) as all_categories - FROM primary_categories pc - LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id - ORDER BY pc.title - """ + pc.podcast_id, + pc.itunes_id, + pc.slug, + pc.itunes_url, + pc.title, + pc.primary_category, + COALESCE(ac.all_categories, pc.primary_category) as all_categories + FROM primary_categories pc + LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id + ORDER BY pc.title + """ + + df = duckdb_conn.execute(query, [db_path, db_path]).df() + + except Exception as e: + print(f"DuckDB complex query failed: {e}, using pandas fallback") + + # Fallback: Use pandas to process the data + import pandas as pd + import sqlite3 + + sqlite_conn = sqlite3.connect(db_path) + try: + # Read podcasts and categories separately + podcasts_df = pd.read_sql_query("SELECT * FROM podcasts", sqlite_conn) + categories_df = pd.read_sql_query("SELECT * FROM categories", sqlite_conn) + + # Group categories by podcast_id + categories_grouped = categories_df.groupby('podcast_id')['category'].apply( + lambda x: '|'.join(sorted(x)) + ).reset_index() + categories_grouped.columns = ['podcast_id', 'all_categories'] + + # Get primary category (first alphabetically) + primary_categories = categories_df.sort_values('category').groupby('podcast_id').first().reset_index() + primary_categories = primary_categories[['podcast_id', 'category']].rename(columns={'category': 'primary_category'}) + + # Join everything together + df = podcasts_df.merge(primary_categories, on='podcast_id', how='left') + df = df.merge(categories_grouped, on='podcast_id', how='left') + + # Fill missing values + df['primary_category'] = df['primary_category'].fillna('unknown') + df['all_categories'] = df['all_categories'].fillna(df['primary_category']) + + # Sort by title + df = df.sort_values('title') + + finally: + sqlite_conn.close() - # Execute query and save to parquet - duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)", [db_path, db_path]) + # Save to parquet with fallback + save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet") - # Get row count for logging - count_result = duckdb_conn.execute( - "SELECT COUNT(*) FROM sqlite_scan(?, 'podcasts')", - [db_path] - ).fetchone() - - row_count = count_result[0] if count_result else 0 + row_count = len(df) print(f"Extracted {row_count} podcasts with category information") finally: - sqlite_conn.close() duckdb_conn.close() if __name__ == "__main__": diff --git a/examples/podcast_reviews/extract_reviews_job.py b/examples/podcast_reviews/extract_reviews_job.py index 045a027..905dd53 100644 --- a/examples/podcast_reviews/extract_reviews_job.py +++ b/examples/podcast_reviews/extract_reviews_job.py @@ -10,12 +10,28 @@ import re from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback def main(): + # Write debug info to understand what's being called + debug_file = "/tmp/databuild_test/reviews_main_debug.log" + try: + with open(debug_file, "w") as f: + f.write(f"main() called with sys.argv: {sys.argv}\n") + f.flush() + except: + pass + if len(sys.argv) < 2: print("Usage: extract_reviews_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] + try: + with open(debug_file, "a") as f: + f.write(f"command: {command}\n") + f.flush() + except: + pass + if command == "config": handle_config(sys.argv[2:]) elif command == "exec":