databuild/examples/podcast_reviews/test_jobs.py
2025-06-30 22:15:48 -07:00

212 lines
No EOL
7.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Basic tests for podcast reviews jobs.
Tests the configuration and basic execution flow of each job.
"""
import os
import sys
import subprocess
import tempfile
import shutil
import json
from pathlib import Path
def run_job_config(job_script: str, partition_ref: str):
"""Run a job in config mode and return the parsed config."""
cmd = [sys.executable, job_script, "config", partition_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Config failed for {job_script}: {result.stderr}")
return json.loads(result.stdout)
def test_extract_reviews_job():
"""Test extract_reviews_job configuration."""
print("Testing extract_reviews_job...")
config = run_job_config("extract_reviews_job.py", "reviews/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "reviews/date=2020-01-01"}]
assert job_config["inputs"] == []
assert job_config["args"] == ["2020-01-01"]
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ extract_reviews_job config test passed")
def test_extract_podcasts_job():
"""Test extract_podcasts_job configuration."""
print("Testing extract_podcasts_job...")
config = run_job_config("extract_podcasts_job.py", "podcasts/all")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "podcasts/all"}]
assert job_config["inputs"] == []
assert job_config["args"] == []
assert job_config["env"]["PARTITION_REF"] == "podcasts/all"
print("✓ extract_podcasts_job config test passed")
def test_categorize_reviews_job():
"""Test categorize_reviews_job configuration."""
print("Testing categorize_reviews_job...")
config = run_job_config("categorize_reviews_job.py", "categorized_reviews/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "categorized_reviews/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "reviews/date=2020-01-01" in input_refs
assert "podcasts/all" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ categorize_reviews_job config test passed")
def test_phrase_modeling_job():
"""Test phrase_modeling_job configuration."""
print("Testing phrase_modeling_job...")
config = run_job_config("phrase_modeling_job.py", "phrase_models/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "phrase_models/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 1
assert job_config["inputs"][0]["partition_ref"]["str"] == "categorized_reviews/category=comedy/date=2020-01-01"
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ phrase_modeling_job config test passed")
def test_phrase_stats_job():
"""Test phrase_stats_job configuration."""
print("Testing phrase_stats_job...")
config = run_job_config("phrase_stats_job.py", "phrase_stats/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "phrase_stats/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "phrase_models/category=comedy/date=2020-01-01" in input_refs
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ phrase_stats_job config test passed")
def test_daily_summary_job():
"""Test daily_summary_job configuration."""
print("Testing daily_summary_job...")
config = run_job_config("daily_summary_job.py", "daily_summaries/category=comedy/date=2020-01-01")
assert len(config["configs"]) == 1
job_config = config["configs"][0]
assert job_config["outputs"] == [{"str": "daily_summaries/category=comedy/date=2020-01-01"}]
assert len(job_config["inputs"]) == 2
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
assert "phrase_stats/category=comedy/date=2020-01-01" in input_refs
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
assert job_config["args"] == ["comedy", "2020-01-01"]
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
print("✓ daily_summary_job config test passed")
def test_job_lookup():
"""Test job_lookup functionality."""
print("Testing job_lookup...")
test_cases = [
("reviews/date=2020-01-01", ":extract_reviews_job"),
("podcasts/all", ":extract_podcasts_job"),
("categorized_reviews/category=comedy/date=2020-01-01", ":categorize_reviews_job"),
("phrase_models/category=comedy/date=2020-01-01", ":phrase_modeling_job"),
("phrase_stats/category=comedy/date=2020-01-01", ":phrase_stats_job"),
("daily_summaries/category=comedy/date=2020-01-01", ":daily_summary_job"),
]
for partition_ref, expected_job_label in test_cases:
cmd = [sys.executable, "job_lookup.py", partition_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Job lookup failed for {partition_ref}: {result.stderr}")
response = json.loads(result.stdout)
# New format: {job_label: [partition_refs]}
assert expected_job_label in response
assert partition_ref in response[expected_job_label]
print("✓ job_lookup test passed")
def test_invalid_partition_refs():
"""Test that invalid partition refs are handled properly."""
print("Testing invalid partition refs...")
invalid_refs = [
"invalid/ref",
"reviews/date=invalid-date",
"categorized_reviews/category=/date=2020-01-01", # missing category
"unknown/partition=ref",
]
for invalid_ref in invalid_refs:
cmd = [sys.executable, "job_lookup.py", invalid_ref]
result = subprocess.run(cmd, capture_output=True, text=True)
# Should fail for invalid refs
assert result.returncode != 0, f"Expected failure for invalid ref: {invalid_ref}"
print("✓ invalid partition refs test passed")
def main():
"""Run all tests."""
print("Running podcast reviews job tests...")
print("=" * 50)
try:
test_extract_reviews_job()
test_extract_podcasts_job()
test_categorize_reviews_job()
test_phrase_modeling_job()
test_phrase_stats_job()
test_daily_summary_job()
test_job_lookup()
test_invalid_partition_refs()
print("=" * 50)
print("All tests passed! ✅")
except Exception as e:
print(f"Test failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()