212 lines
No EOL
7.6 KiB
Python
Executable file
212 lines
No EOL
7.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Basic tests for podcast reviews jobs.
|
|
Tests the configuration and basic execution flow of each job.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import tempfile
|
|
import shutil
|
|
import json
|
|
from pathlib import Path
|
|
|
|
def run_job_config(job_script: str, partition_ref: str):
|
|
"""Run a job in config mode and return the parsed config."""
|
|
cmd = [sys.executable, job_script, "config", partition_ref]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Config failed for {job_script}: {result.stderr}")
|
|
|
|
return json.loads(result.stdout)
|
|
|
|
def test_extract_reviews_job():
|
|
"""Test extract_reviews_job configuration."""
|
|
print("Testing extract_reviews_job...")
|
|
|
|
config = run_job_config("extract_reviews_job.py", "reviews/date=2020-01-01")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "reviews/date=2020-01-01"}]
|
|
assert job_config["inputs"] == []
|
|
assert job_config["args"] == ["2020-01-01"]
|
|
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
|
|
|
|
print("✓ extract_reviews_job config test passed")
|
|
|
|
def test_extract_podcasts_job():
|
|
"""Test extract_podcasts_job configuration."""
|
|
print("Testing extract_podcasts_job...")
|
|
|
|
config = run_job_config("extract_podcasts_job.py", "podcasts/all")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "podcasts/all"}]
|
|
assert job_config["inputs"] == []
|
|
assert job_config["args"] == []
|
|
assert job_config["env"]["PARTITION_REF"] == "podcasts/all"
|
|
|
|
print("✓ extract_podcasts_job config test passed")
|
|
|
|
def test_categorize_reviews_job():
|
|
"""Test categorize_reviews_job configuration."""
|
|
print("Testing categorize_reviews_job...")
|
|
|
|
config = run_job_config("categorize_reviews_job.py", "categorized_reviews/category=comedy/date=2020-01-01")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "categorized_reviews/category=comedy/date=2020-01-01"}]
|
|
assert len(job_config["inputs"]) == 2
|
|
|
|
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
|
|
assert "reviews/date=2020-01-01" in input_refs
|
|
assert "podcasts/all" in input_refs
|
|
|
|
assert job_config["args"] == ["comedy", "2020-01-01"]
|
|
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
|
|
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
|
|
|
|
print("✓ categorize_reviews_job config test passed")
|
|
|
|
def test_phrase_modeling_job():
|
|
"""Test phrase_modeling_job configuration."""
|
|
print("Testing phrase_modeling_job...")
|
|
|
|
config = run_job_config("phrase_modeling_job.py", "phrase_models/category=comedy/date=2020-01-01")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "phrase_models/category=comedy/date=2020-01-01"}]
|
|
assert len(job_config["inputs"]) == 1
|
|
assert job_config["inputs"][0]["partition_ref"]["str"] == "categorized_reviews/category=comedy/date=2020-01-01"
|
|
|
|
assert job_config["args"] == ["comedy", "2020-01-01"]
|
|
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
|
|
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
|
|
|
|
print("✓ phrase_modeling_job config test passed")
|
|
|
|
def test_phrase_stats_job():
|
|
"""Test phrase_stats_job configuration."""
|
|
print("Testing phrase_stats_job...")
|
|
|
|
config = run_job_config("phrase_stats_job.py", "phrase_stats/category=comedy/date=2020-01-01")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "phrase_stats/category=comedy/date=2020-01-01"}]
|
|
assert len(job_config["inputs"]) == 2
|
|
|
|
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
|
|
assert "phrase_models/category=comedy/date=2020-01-01" in input_refs
|
|
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
|
|
|
|
assert job_config["args"] == ["comedy", "2020-01-01"]
|
|
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
|
|
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
|
|
|
|
print("✓ phrase_stats_job config test passed")
|
|
|
|
def test_daily_summary_job():
|
|
"""Test daily_summary_job configuration."""
|
|
print("Testing daily_summary_job...")
|
|
|
|
config = run_job_config("daily_summary_job.py", "daily_summaries/category=comedy/date=2020-01-01")
|
|
|
|
assert len(config["configs"]) == 1
|
|
job_config = config["configs"][0]
|
|
|
|
assert job_config["outputs"] == [{"str": "daily_summaries/category=comedy/date=2020-01-01"}]
|
|
assert len(job_config["inputs"]) == 2
|
|
|
|
input_refs = [inp["partition_ref"]["str"] for inp in job_config["inputs"]]
|
|
assert "phrase_stats/category=comedy/date=2020-01-01" in input_refs
|
|
assert "categorized_reviews/category=comedy/date=2020-01-01" in input_refs
|
|
|
|
assert job_config["args"] == ["comedy", "2020-01-01"]
|
|
assert job_config["env"]["TARGET_CATEGORY"] == "comedy"
|
|
assert job_config["env"]["TARGET_DATE"] == "2020-01-01"
|
|
|
|
print("✓ daily_summary_job config test passed")
|
|
|
|
def test_job_lookup():
|
|
"""Test job_lookup functionality."""
|
|
print("Testing job_lookup...")
|
|
|
|
test_cases = [
|
|
("reviews/date=2020-01-01", ":extract_reviews_job"),
|
|
("podcasts/all", ":extract_podcasts_job"),
|
|
("categorized_reviews/category=comedy/date=2020-01-01", ":categorize_reviews_job"),
|
|
("phrase_models/category=comedy/date=2020-01-01", ":phrase_modeling_job"),
|
|
("phrase_stats/category=comedy/date=2020-01-01", ":phrase_stats_job"),
|
|
("daily_summaries/category=comedy/date=2020-01-01", ":daily_summary_job"),
|
|
]
|
|
|
|
for partition_ref, expected_job_label in test_cases:
|
|
cmd = [sys.executable, "job_lookup.py", partition_ref]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Job lookup failed for {partition_ref}: {result.stderr}")
|
|
|
|
response = json.loads(result.stdout)
|
|
# New format: {job_label: [partition_refs]}
|
|
assert expected_job_label in response
|
|
assert partition_ref in response[expected_job_label]
|
|
|
|
print("✓ job_lookup test passed")
|
|
|
|
def test_invalid_partition_refs():
|
|
"""Test that invalid partition refs are handled properly."""
|
|
print("Testing invalid partition refs...")
|
|
|
|
invalid_refs = [
|
|
"invalid/ref",
|
|
"reviews/date=invalid-date",
|
|
"categorized_reviews/category=/date=2020-01-01", # missing category
|
|
"unknown/partition=ref",
|
|
]
|
|
|
|
for invalid_ref in invalid_refs:
|
|
cmd = [sys.executable, "job_lookup.py", invalid_ref]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
# Should fail for invalid refs
|
|
assert result.returncode != 0, f"Expected failure for invalid ref: {invalid_ref}"
|
|
|
|
print("✓ invalid partition refs test passed")
|
|
|
|
def main():
|
|
"""Run all tests."""
|
|
print("Running podcast reviews job tests...")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
test_extract_reviews_job()
|
|
test_extract_podcasts_job()
|
|
test_categorize_reviews_job()
|
|
test_phrase_modeling_job()
|
|
test_phrase_stats_job()
|
|
test_daily_summary_job()
|
|
test_job_lookup()
|
|
test_invalid_partition_refs()
|
|
|
|
print("=" * 50)
|
|
print("All tests passed! ✅")
|
|
|
|
except Exception as e:
|
|
print(f"Test failed: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |