Update podcast reviews example

This commit is contained in:
Stuart Axelbrooke 2025-07-02 21:37:55 -07:00
parent ce577c6335
commit 40c8b200ef
6 changed files with 121 additions and 80 deletions

View file

@ -242,13 +242,21 @@ fn worker(
}
fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool {
let mut missing_deps = Vec::new();
for dep in &task.config.inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
if !completed_outputs.contains(&dep.partition_ref.str) {
return false;
missing_deps.push(&dep.partition_ref.str);
}
}
}
if !missing_deps.is_empty() {
debug!("Task {} not ready - missing dependencies: {:?}", task.job_label, missing_deps);
return false;
}
true
}
@ -379,6 +387,34 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// 4. Periodic logging
if last_log_time.elapsed() >= LOG_INTERVAL {
log_status_summary(&task_states, &original_tasks_by_key);
// Debug: Check for deadlock (pending tasks with no running tasks)
let has_pending = task_states.values().any(|s| *s == TaskState::Pending);
if has_pending && active_tasks_count == 0 {
warn!("Potential deadlock detected: {} pending tasks with no running tasks",
task_states.values().filter(|s| **s == TaskState::Pending).count());
// Log details of pending tasks and their preconditions
for (key, state) in &task_states {
if *state == TaskState::Pending {
if let Some(task) = original_tasks_by_key.get(key) {
warn!("Pending task: {} ({})", task.job_label, key);
warn!(" Required inputs:");
for dep in &task.config.inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
let available = completed_outputs.contains(&dep.partition_ref.str);
warn!(" {} - {}", dep.partition_ref.str, if available { "AVAILABLE" } else { "MISSING" });
}
}
warn!(" Produces outputs:");
for output in &task.config.outputs {
warn!(" {}", output.str);
}
}
}
}
}
last_log_time = Instant::now();
}

View file

@ -36,22 +36,23 @@ def handle_config(args):
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
configs = []
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: reviews for the date and podcast metadata
reviews_ref = f"reviews/date={date_str}"
podcasts_ref = "podcasts/all"
config = {
"configs": [{
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: reviews for the date and podcast metadata
reviews_ref = f"reviews/date={date_str}"
podcasts_ref = "podcasts/all"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": reviews_ref}},
@ -63,9 +64,9 @@ def handle_config(args):
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):

View file

@ -36,22 +36,23 @@ def handle_config(args):
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
configs = []
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase stats and categorized reviews for the category and date
phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase stats and categorized reviews for the category and date
phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}},
@ -63,9 +64,9 @@ def handle_config(args):
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):

View file

@ -52,17 +52,18 @@ def handle_config(args):
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
configs = []
try:
parsed = parse_partition_ref(partition_ref)
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
config = {
"configs": [{
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [date_str],
@ -70,9 +71,9 @@ def handle_config(args):
"PARTITION_REF": partition_ref,
"TARGET_DATE": date_str
}
}]
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):

View file

@ -38,21 +38,22 @@ def handle_config(args):
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
configs = []
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: categorized reviews for the category and date
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: categorized reviews for the category and date
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
@ -63,9 +64,9 @@ def handle_config(args):
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):

View file

@ -36,22 +36,23 @@ def handle_config(args):
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
configs = []
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase models and categorized reviews for the category and date
phrase_models_ref = f"phrase_models/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
config = {
"configs": [{
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase models and categorized reviews for the category and date
phrase_models_ref = f"phrase_models/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_models_ref}},
@ -63,9 +64,9 @@ def handle_config(args):
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
}]
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):