diff --git a/databuild/graph/execute.rs b/databuild/graph/execute.rs index e711251..502b104 100644 --- a/databuild/graph/execute.rs +++ b/databuild/graph/execute.rs @@ -242,13 +242,21 @@ fn worker( } fn is_task_ready(task: &Task, completed_outputs: &HashSet) -> bool { + let mut missing_deps = Vec::new(); + for dep in &task.config.inputs { if dep.dep_type == 1 { // MATERIALIZE = 1 if !completed_outputs.contains(&dep.partition_ref.str) { - return false; + missing_deps.push(&dep.partition_ref.str); } } } + + if !missing_deps.is_empty() { + debug!("Task {} not ready - missing dependencies: {:?}", task.job_label, missing_deps); + return false; + } + true } @@ -379,6 +387,34 @@ fn main() -> Result<(), Box> { // 4. Periodic logging if last_log_time.elapsed() >= LOG_INTERVAL { log_status_summary(&task_states, &original_tasks_by_key); + + // Debug: Check for deadlock (pending tasks with no running tasks) + let has_pending = task_states.values().any(|s| *s == TaskState::Pending); + if has_pending && active_tasks_count == 0 { + warn!("Potential deadlock detected: {} pending tasks with no running tasks", + task_states.values().filter(|s| **s == TaskState::Pending).count()); + + // Log details of pending tasks and their preconditions + for (key, state) in &task_states { + if *state == TaskState::Pending { + if let Some(task) = original_tasks_by_key.get(key) { + warn!("Pending task: {} ({})", task.job_label, key); + warn!(" Required inputs:"); + for dep in &task.config.inputs { + if dep.dep_type == 1 { // MATERIALIZE = 1 + let available = completed_outputs.contains(&dep.partition_ref.str); + warn!(" {} - {}", dep.partition_ref.str, if available { "AVAILABLE" } else { "MISSING" }); + } + } + warn!(" Produces outputs:"); + for output in &task.config.outputs { + warn!(" {}", output.str); + } + } + } + } + } + last_log_time = Instant::now(); } diff --git a/examples/podcast_reviews/categorize_reviews_job.py b/examples/podcast_reviews/categorize_reviews_job.py index f1f9beb..c74c88d 100644 --- a/examples/podcast_reviews/categorize_reviews_job.py +++ b/examples/podcast_reviews/categorize_reviews_job.py @@ -36,22 +36,23 @@ def handle_config(args): print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) - partition_ref = args[0] + configs = [] - try: - parsed = parse_partition_ref(partition_ref) - category = parsed["category"] - date_str = parsed["date"] - except ValueError as e: - print(f"Error parsing partition ref: {e}", file=sys.stderr) - sys.exit(1) - - # Dependencies: reviews for the date and podcast metadata - reviews_ref = f"reviews/date={date_str}" - podcasts_ref = "podcasts/all" - - config = { - "configs": [{ + # Process each partition reference + for partition_ref in args: + try: + parsed = parse_partition_ref(partition_ref) + category = parsed["category"] + date_str = parsed["date"] + except ValueError as e: + print(f"Error parsing partition ref: {e}", file=sys.stderr) + sys.exit(1) + + # Dependencies: reviews for the date and podcast metadata + reviews_ref = f"reviews/date={date_str}" + podcasts_ref = "podcasts/all" + + configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": reviews_ref}}, @@ -63,9 +64,9 @@ def handle_config(args): "TARGET_CATEGORY": category, "TARGET_DATE": date_str } - }] - } + }) + config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): diff --git a/examples/podcast_reviews/daily_summary_job.py b/examples/podcast_reviews/daily_summary_job.py index 5d5f312..4f4a796 100644 --- a/examples/podcast_reviews/daily_summary_job.py +++ b/examples/podcast_reviews/daily_summary_job.py @@ -36,22 +36,23 @@ def handle_config(args): print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) - partition_ref = args[0] + configs = [] - try: - parsed = parse_partition_ref(partition_ref) - category = parsed["category"] - date_str = parsed["date"] - except ValueError as e: - print(f"Error parsing partition ref: {e}", file=sys.stderr) - sys.exit(1) - - # Dependencies: phrase stats and categorized reviews for the category and date - phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}" - categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" - - config = { - "configs": [{ + # Process each partition reference + for partition_ref in args: + try: + parsed = parse_partition_ref(partition_ref) + category = parsed["category"] + date_str = parsed["date"] + except ValueError as e: + print(f"Error parsing partition ref: {e}", file=sys.stderr) + sys.exit(1) + + # Dependencies: phrase stats and categorized reviews for the category and date + phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}" + categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" + + configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}}, @@ -63,9 +64,9 @@ def handle_config(args): "TARGET_CATEGORY": category, "TARGET_DATE": date_str } - }] - } + }) + config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): diff --git a/examples/podcast_reviews/extract_reviews_job.py b/examples/podcast_reviews/extract_reviews_job.py index 905dd53..245db78 100644 --- a/examples/podcast_reviews/extract_reviews_job.py +++ b/examples/podcast_reviews/extract_reviews_job.py @@ -52,17 +52,18 @@ def handle_config(args): print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) - partition_ref = args[0] + configs = [] - try: - parsed = parse_partition_ref(partition_ref) - date_str = parsed["date"] - except ValueError as e: - print(f"Error parsing partition ref: {e}", file=sys.stderr) - sys.exit(1) - - config = { - "configs": [{ + # Process each partition reference + for partition_ref in args: + try: + parsed = parse_partition_ref(partition_ref) + date_str = parsed["date"] + except ValueError as e: + print(f"Error parsing partition ref: {e}", file=sys.stderr) + sys.exit(1) + + configs.append({ "outputs": [{"str": partition_ref}], "inputs": [], "args": [date_str], @@ -70,9 +71,9 @@ def handle_config(args): "PARTITION_REF": partition_ref, "TARGET_DATE": date_str } - }] - } + }) + config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): diff --git a/examples/podcast_reviews/phrase_modeling_job.py b/examples/podcast_reviews/phrase_modeling_job.py index 67f9fb5..7c528fc 100644 --- a/examples/podcast_reviews/phrase_modeling_job.py +++ b/examples/podcast_reviews/phrase_modeling_job.py @@ -38,21 +38,22 @@ def handle_config(args): print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) - partition_ref = args[0] + configs = [] - try: - parsed = parse_partition_ref(partition_ref) - category = parsed["category"] - date_str = parsed["date"] - except ValueError as e: - print(f"Error parsing partition ref: {e}", file=sys.stderr) - sys.exit(1) - - # Dependencies: categorized reviews for the category and date - categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" - - config = { - "configs": [{ + # Process each partition reference + for partition_ref in args: + try: + parsed = parse_partition_ref(partition_ref) + category = parsed["category"] + date_str = parsed["date"] + except ValueError as e: + print(f"Error parsing partition ref: {e}", file=sys.stderr) + sys.exit(1) + + # Dependencies: categorized reviews for the category and date + categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" + + configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}} @@ -63,9 +64,9 @@ def handle_config(args): "TARGET_CATEGORY": category, "TARGET_DATE": date_str } - }] - } + }) + config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): diff --git a/examples/podcast_reviews/phrase_stats_job.py b/examples/podcast_reviews/phrase_stats_job.py index 4047322..5fe4f64 100644 --- a/examples/podcast_reviews/phrase_stats_job.py +++ b/examples/podcast_reviews/phrase_stats_job.py @@ -36,22 +36,23 @@ def handle_config(args): print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) - partition_ref = args[0] + configs = [] - try: - parsed = parse_partition_ref(partition_ref) - category = parsed["category"] - date_str = parsed["date"] - except ValueError as e: - print(f"Error parsing partition ref: {e}", file=sys.stderr) - sys.exit(1) - - # Dependencies: phrase models and categorized reviews for the category and date - phrase_models_ref = f"phrase_models/category={category}/date={date_str}" - categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" - - config = { - "configs": [{ + # Process each partition reference + for partition_ref in args: + try: + parsed = parse_partition_ref(partition_ref) + category = parsed["category"] + date_str = parsed["date"] + except ValueError as e: + print(f"Error parsing partition ref: {e}", file=sys.stderr) + sys.exit(1) + + # Dependencies: phrase models and categorized reviews for the category and date + phrase_models_ref = f"phrase_models/category={category}/date={date_str}" + categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" + + configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": phrase_models_ref}}, @@ -63,9 +64,9 @@ def handle_config(args): "TARGET_CATEGORY": category, "TARGET_DATE": date_str } - }] - } + }) + config = {"configs": configs} print(json.dumps(config)) def handle_exec(args):