databuild/databuild/graph/execute.rs
2025-08-20 23:34:37 -07:00

817 lines
40 KiB
Rust

use databuild::{JobGraph, Task, JobStatus, BuildRequestStatus, BuildRequestStatusCode, PartitionStatus, BuildRequestEvent, JobEvent, PartitionEvent, PartitionRef};
use databuild::event_log::{create_bel_query_engine, create_build_event};
use databuild::build_event::EventType;
use databuild::log_collector::{LogCollector, LogCollectorError};
use crossbeam_channel::{Receiver, Sender};
use log::{debug, error, info, warn};
use std::collections::{HashMap, HashSet};
use std::io::{BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::{env, thread};
use std::time::{Duration, Instant};
// Command line parsing removed - using environment variables
use uuid::Uuid;
const NUM_WORKERS: usize = 4;
const LOG_INTERVAL: Duration = Duration::from_secs(5);
const FAIL_FAST: bool = true; // Same default as the Go version
#[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState {
Pending,
Running,
Succeeded,
Failed,
}
#[derive(Debug, Clone)]
struct TaskExecutionResult {
task_key: String,
job_label: String, // For logging
success: bool,
stdout: String,
stderr: String,
duration: Duration,
error_message: Option<String>,
}
// Generates a unique key for a task based on its JobLabel, input and output references.
// Mirrors the Go implementation's getTaskKey.
fn get_task_key(task: &Task) -> String {
let mut key_parts = Vec::new();
key_parts.push(task.job.as_ref().unwrap().label.clone());
for input_dep in &task.config.as_ref().unwrap().inputs {
key_parts.push(format!("input:{}", input_dep.partition_ref.as_ref().unwrap().str));
}
for output_ref in &task.config.as_ref().unwrap().outputs {
key_parts.push(format!("output:{}", output_ref.str));
}
key_parts.join("|")
}
fn worker(
task_rx: Receiver<Arc<Task>>,
result_tx: Sender<TaskExecutionResult>,
worker_id: usize,
) {
info!("[Worker {}] Starting", worker_id);
while let Ok(task) = task_rx.recv() {
let task_key = get_task_key(&task);
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
let start_time = Instant::now();
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS_EXEC")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
// Look up the executable path for this job
let job_label = &task.job.as_ref().unwrap().label;
let exec_path = job_path_map.get(job_label)
.ok_or_else(|| format!("Job {} is not a candidate job", job_label)).unwrap();
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
Ok(json) => json,
Err(e) => {
let err_msg = format!("Failed to serialize task config for {}: {}", task.job.as_ref().unwrap().label, e);
error!("[Worker {}] {}", worker_id, err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send error result: {}", worker_id, e));
continue;
}
};
// Generate a job run ID for this execution
let job_run_id = Uuid::new_v4().to_string();
info!("Running job {} (Path: {}) with config: {}", job_label, exec_path, config_json);
let mut cmd = Command::new(&exec_path);
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
// Set environment variables from the current process's environment
// This mirrors the Go `cmd.Env = os.Environ()` behavior.
// Task-specific env vars from task.config.env are passed via JSON through stdin.
cmd.env_clear(); // Start with no environment variables
for (key, value) in std::env::vars() {
cmd.env(key, value); // Add current process's environment variables
}
// Add the job run ID so the job wrapper can use the same ID
cmd.env("DATABUILD_JOB_RUN_ID", &job_run_id);
match cmd.spawn() {
Ok(mut child) => {
if let Some(mut child_stdin) = child.stdin.take() {
if let Err(e) = child_stdin.write_all(config_json.as_bytes()) {
let err_msg = format!("[Worker {}] Failed to write to stdin for {}: {}", worker_id, task.job.as_ref().unwrap().label, e);
error!("{}", err_msg);
// Ensure child is killed if stdin write fails before wait
let _ = child.kill();
let _ = child.wait(); // Reap the child
result_tx.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send error result: {}", worker_id, e));
continue;
}
drop(child_stdin); // Close stdin to signal EOF to the child
} else {
let err_msg = format!("[Worker {}] Failed to get stdin for {}", worker_id, task.job.as_ref().unwrap().label);
error!("{}", err_msg);
result_tx.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send error result: {}", worker_id, e));
continue;
}
// Initialize log collector
let mut log_collector = match LogCollector::new(LogCollector::default_logs_dir()) {
Ok(mut collector) => {
// Set the job label mapping for this job run
collector.set_job_label(&job_run_id, &task.job.as_ref().unwrap().label);
collector
},
Err(e) => {
let err_msg = format!("[Worker {}] Failed to initialize log collector for {}: {}",
worker_id, task.job.as_ref().unwrap().label, e);
error!("{}", err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send error result: {}", worker_id, e));
continue;
}
};
// Collect stdout/stderr and process with LogCollector
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
let mut stdout_content = String::new();
let mut stderr_content = String::new();
// Read stdout and process with LogCollector
if let Some(stdout) = stdout_handle {
let stdout_reader = BufReader::new(stdout);
if let Err(e) = log_collector.consume_job_output(&job_run_id, stdout_reader) {
warn!("[Worker {}] Failed to process job logs for {}: {}",
worker_id, task.job.as_ref().unwrap().label, e);
}
}
// Read stderr (raw, not structured)
if let Some(mut stderr) = stderr_handle {
if let Err(e) = stderr.read_to_string(&mut stderr_content) {
warn!("[Worker {}] Failed to read stderr for {}: {}",
worker_id, task.job.as_ref().unwrap().label, e);
}
}
// Wait for the process to finish
match child.wait() {
Ok(status) => {
let duration = start_time.elapsed();
let success = status.success();
// Close the log collector for this job
if let Err(e) = log_collector.close_job(&job_run_id) {
warn!("[Worker {}] Failed to close log collector for {}: {}",
worker_id, task.job.as_ref().unwrap().label, e);
}
if success {
info!(
"[Worker {}] Job succeeded: {} (Duration: {:?}, Job Run ID: {})",
worker_id, task.job.as_ref().unwrap().label, duration, job_run_id
);
} else {
error!(
"[Worker {}] Job failed: {} (Duration: {:?}, Status: {:?}, Job Run ID: {})\nStderr: {}",
worker_id, task.job.as_ref().unwrap().label, duration, status, job_run_id, stderr_content
);
}
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success,
stdout: format!("Job logs written to JSONL (Job Run ID: {})", job_run_id),
stderr: stderr_content,
duration,
error_message: if success { None } else { Some(format!("Exited with status: {:?}", status)) },
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send result: {}", worker_id, e));
}
Err(e) => {
let err_msg = format!("[Worker {}] Failed to execute or wait for {}: {}", worker_id, task.job.as_ref().unwrap().label, e);
error!("{}", err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send execution error result: {}", worker_id, e));
}
}
}
Err(e) => {
let err_msg = format!("[Worker {}] Failed to spawn command for {}: {} (Path: {:?})", worker_id, task.job.as_ref().unwrap().label, e, exec_path);
error!("{}", err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
duration: start_time.elapsed(),
error_message: Some(err_msg),
})
.unwrap_or_else(|e| error!("[Worker {}] Failed to send spawn error result: {}", worker_id, e));
}
}
}
info!("[Worker {}] Exiting", worker_id);
}
fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool {
let mut missing_deps = Vec::new();
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type_code == 1 { // MATERIALIZE = 1
if !completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str) {
missing_deps.push(&dep.partition_ref.as_ref().unwrap().str);
}
}
}
if !missing_deps.is_empty() {
debug!("Task {} not ready - missing dependencies: {:?}", task.job.as_ref().unwrap().label, missing_deps);
return false;
}
true
}
// Check if partitions are already available or being built by other build requests
async fn check_build_coordination(
task: &Task,
query_engine: &Arc<databuild::event_log::query_engine::BELQueryEngine>,
build_request_id: &str
) -> Result<(bool, bool, Vec<(PartitionRef, String)>), String> {
let outputs = &task.config.as_ref().unwrap().outputs;
let mut available_partitions = Vec::new();
let mut needs_building = false;
for output_ref in outputs {
debug!("Checking build coordination for partition: {}", output_ref.str);
// First check if this partition is already available
match query_engine.get_latest_partition_status(&output_ref.str).await {
Ok(Some((status, _timestamp))) => {
debug!("Partition {} has status: {:?}", output_ref.str, status);
if status == databuild::PartitionStatus::PartitionAvailable {
// Get which build request created this partition
match query_engine.get_build_request_for_available_partition(&output_ref.str).await {
Ok(Some(source_build_id)) => {
info!("Partition {} already available from build {}", output_ref.str, source_build_id);
available_partitions.push((output_ref.clone(), source_build_id));
continue;
}
Ok(None) => {
error!("Partition {} is available but no source build found - this indicates a bug in the event log implementation", output_ref.str);
return Err(format!("Available partition {} has no source build ID. This suggests the event log is missing required data.", output_ref.str));
}
Err(e) => {
error!("Failed to get source build for partition {}: {}", output_ref.str, e);
return Err(format!("Cannot determine source build for available partition {}: {}", output_ref.str, e));
}
}
} else {
debug!("Partition {} has non-available status {:?}, needs building", output_ref.str, status);
needs_building = true;
}
}
Ok(None) => {
debug!("Partition {} has no status, needs building", output_ref.str);
needs_building = true;
}
Err(e) => {
error!("Failed to check partition status for {}: {}", output_ref.str, e);
return Err(format!("Cannot check partition status: {}. Use a queryable event log (e.g., SQLite) for builds that need to check existing partitions.", e));
}
}
// Check if this partition is being built by another request
match query_engine.get_active_builds_for_partition(&output_ref.str).await {
Ok(active_builds) => {
let other_builds: Vec<String> = active_builds.into_iter()
.filter(|id| id != build_request_id)
.collect();
if !other_builds.is_empty() {
info!("Partition {} is already being built by other requests: {:?}. Delegating.",
output_ref.str, other_builds);
// Log delegation event for active builds
for delegated_to_build_id in &other_builds {
let event = create_build_event(
build_request_id.to_string(),
EventType::DelegationEvent(databuild::DelegationEvent {
partition_ref: Some(output_ref.clone()),
delegated_to_build_request_id: delegated_to_build_id.clone(),
message: "Delegated to active build during execution".to_string(),
})
);
if let Err(e) = query_engine.append_event(event).await {
error!("Failed to log delegation event: {}", e);
}
}
return Ok((false, false, available_partitions)); // Don't build, delegated to active build
}
}
Err(e) => {
error!("Failed to check active builds for partition {}: {}", output_ref.str, e);
return Err(format!("Cannot check active builds: {}. Use a queryable event log (e.g., SQLite) for builds that need to check for concurrent execution.", e));
}
}
// If we reach here, this partition needs to be built
needs_building = true;
}
// Only skip the job if ALL partitions are already available
if !needs_building && available_partitions.len() == outputs.len() {
Ok((false, true, available_partitions)) // Don't build, skip due to all partitions available
} else {
Ok((true, false, available_partitions)) // Need to build (some partitions unavailable)
}
}
fn log_status_summary(
task_states: &HashMap<String, TaskState>,
original_tasks_by_key: &HashMap<String, Arc<Task>>,
) {
let mut pending_tasks = Vec::new();
let mut running_tasks = Vec::new();
let mut succeeded_tasks = Vec::new();
let mut failed_tasks = Vec::new();
for (key, state) in task_states {
let label = original_tasks_by_key.get(key).map_or_else(|| key.as_str(), |t| t.job.as_ref().unwrap().label.as_str());
match state {
TaskState::Pending => pending_tasks.push(label),
TaskState::Running => running_tasks.push(label),
TaskState::Succeeded => succeeded_tasks.push(label),
TaskState::Failed => failed_tasks.push(label),
}
}
info!("Task Status Summary:");
info!(" Pending ({}): {:?}", pending_tasks.len(), pending_tasks);
info!(" Running ({}): {:?}", running_tasks.len(), running_tasks);
info!(" Succeeded ({}): {:?}", succeeded_tasks.len(), succeeded_tasks);
info!(" Failed ({}): {:?}", failed_tasks.len(), failed_tasks);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
simple_logger::SimpleLogger::new()
.with_level(
std::env::var("RUST_LOG")
.unwrap_or_else(|_| "info".to_string())
.parse()
.unwrap_or(log::LevelFilter::Info)
)
.init()?;
// Get build event log configuration from environment variables
let build_event_log_uri = std::env::var("DATABUILD_BUILD_EVENT_LOG").ok();
let build_request_id = std::env::var("DATABUILD_BUILD_REQUEST_ID")
.unwrap_or_else(|_| Uuid::new_v4().to_string());
// Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri {
match create_bel_query_engine(&uri).await {
Ok(log) => {
info!("Initialized build event log: {}", uri);
Some(log)
}
Err(e) => {
error!("Failed to initialize build event log {}: {}", uri, e);
std::process::exit(1);
}
}
} else {
None
};
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
let graph: JobGraph = serde_json::from_str(&buffer)?;
info!("Executing job graph with {} nodes", graph.nodes.len());
// Log build request execution start (existing detailed event)
if let Some(ref query_engine) = build_event_log {
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: Some(BuildRequestStatusCode::BuildRequestExecuting.status()),
requested_partitions: graph.outputs.clone(),
message: format!("Starting execution of {} jobs", graph.nodes.len()),
comment: None,
want_id: None,
})
);
if let Err(e) = query_engine.append_event(event).await {
error!("Failed to log execution start event: {}", e);
}
}
let mut task_states: HashMap<String, TaskState> = HashMap::new();
let mut original_tasks_by_key: HashMap<String, Arc<Task>> = HashMap::new();
let graph_nodes_arc: Vec<Arc<Task>> = graph.nodes.into_iter().map(Arc::new).collect();
for task_node in &graph_nodes_arc {
let key = get_task_key(task_node);
task_states.insert(key.clone(), TaskState::Pending);
original_tasks_by_key.insert(key, task_node.clone());
}
let mut completed_outputs: HashSet<String> = HashSet::new();
let mut job_results: Vec<TaskExecutionResult> = Vec::new();
let (task_tx, task_rx): (Sender<Arc<Task>>, Receiver<Arc<Task>>) = crossbeam_channel::unbounded();
let (result_tx, result_rx): (Sender<TaskExecutionResult>, Receiver<TaskExecutionResult>) = crossbeam_channel::unbounded();
let mut worker_handles = Vec::new();
for i in 0..NUM_WORKERS {
let task_rx_clone = task_rx.clone();
let result_tx_clone = result_tx.clone();
worker_handles.push(thread::spawn(move || {
worker(task_rx_clone, result_tx_clone, i + 1);
}));
}
// Drop the original result_tx so the channel closes when all workers are done
// if result_rx is the only remaining receiver.
drop(result_tx);
let mut last_log_time = Instant::now();
let mut active_tasks_count = 0;
let mut fail_fast_triggered = false;
loop {
// 1. Process results
while let Ok(result) = result_rx.try_recv() {
active_tasks_count -= 1;
info!(
"Received result for task {}: Success: {}",
result.job_label, result.success
);
let current_state = if result.success {
TaskState::Succeeded
} else {
TaskState::Failed
};
task_states.insert(result.task_key.clone(), current_state);
// Log job completion events
if let Some(ref query_engine) = build_event_log {
if let Some(original_task) = original_tasks_by_key.get(&result.task_key) {
let job_run_id = Uuid::new_v4().to_string();
// Log job completion
let job_event = create_build_event(
build_request_id.clone(),
EventType::JobEvent(JobEvent {
job_run_id: job_run_id.clone(),
job_label: original_task.job.clone(),
target_partitions: original_task.config.as_ref().unwrap().outputs.clone(),
status_code: if result.success { JobStatus::JobCompleted as i32 } else { JobStatus::JobFailed as i32 },
status_name: if result.success { JobStatus::JobCompleted.to_display_string() } else { JobStatus::JobFailed.to_display_string() },
message: if result.success { "Job completed successfully".to_string() } else { result.error_message.clone().unwrap_or_default() },
config: original_task.config.clone(),
manifests: vec![], // Would be populated from actual job output
})
);
if let Err(e) = query_engine.append_event(job_event).await {
error!("Failed to log job completion event: {}", e);
}
// Log partition status updates
for output_ref in &original_task.config.as_ref().unwrap().outputs {
let partition_event = create_build_event(
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status_code: if result.success { PartitionStatus::PartitionAvailable as i32 } else { PartitionStatus::PartitionFailed as i32 },
status_name: if result.success { PartitionStatus::PartitionAvailable.to_display_string() } else { PartitionStatus::PartitionFailed.to_display_string() },
message: if result.success { "Partition built successfully".to_string() } else { "Partition build failed".to_string() },
job_run_id: job_run_id.clone(),
})
);
if let Err(e) = query_engine.append_event(partition_event).await {
error!("Failed to log partition status event: {}", e);
}
}
}
}
if result.success {
if let Some(original_task) = original_tasks_by_key.get(&result.task_key) {
for output_ref in &original_task.config.as_ref().unwrap().outputs {
completed_outputs.insert(output_ref.str.clone());
}
}
} else {
if FAIL_FAST {
warn!("Fail-fast enabled and task {} failed. Shutting down.", result.job_label);
fail_fast_triggered = true;
}
}
job_results.push(result);
}
// 2. Check for fail-fast break
if fail_fast_triggered && active_tasks_count == 0 { // Wait for running tasks to finish if fail fast
info!("All active tasks completed after fail-fast trigger.");
break;
}
if fail_fast_triggered && active_tasks_count > 0 {
// Don't schedule new tasks, just wait for active ones or log
} else if !fail_fast_triggered { // Only dispatch if not in fail-fast shutdown
// 3. Dispatch ready tasks
for task_node in &graph_nodes_arc {
let task_key = get_task_key(task_node);
if task_states.get(&task_key) == Some(&TaskState::Pending) {
if is_task_ready(task_node, &completed_outputs) {
// Check build coordination if event log is available
let (should_build, is_skipped, available_partitions) = if let Some(ref query_engine) = build_event_log {
match check_build_coordination(task_node, query_engine, &build_request_id).await {
Ok((should_build, is_skipped, available_partitions)) => (should_build, is_skipped, available_partitions),
Err(e) => {
error!("Error checking build coordination for {}: {}",
task_node.job.as_ref().unwrap().label, e);
(true, false, Vec::<(PartitionRef, String)>::new()) // Default to building on error
}
}
} else {
(true, false, Vec::<(PartitionRef, String)>::new()) // No event log, always build
};
if !should_build {
if is_skipped {
// Task skipped due to all partitions already available
info!("Task {} skipped - all target partitions already available", task_node.job.as_ref().unwrap().label);
// Log delegation events for each available partition
if let Some(ref query_engine) = build_event_log {
for (partition_ref, source_build_id) in &available_partitions {
let delegation_event = create_build_event(
build_request_id.clone(),
EventType::DelegationEvent(databuild::DelegationEvent {
partition_ref: Some(partition_ref.clone()),
delegated_to_build_request_id: source_build_id.clone(),
message: "Delegated to historical build - partition already available".to_string(),
})
);
if let Err(e) = query_engine.append_event(delegation_event).await {
error!("Failed to log historical delegation event: {}", e);
}
}
// Log JOB_SKIPPED event
let job_run_id = Uuid::new_v4().to_string();
let job_event = create_build_event(
build_request_id.clone(),
EventType::JobEvent(JobEvent {
job_run_id: job_run_id.clone(),
job_label: task_node.job.clone(),
target_partitions: task_node.config.as_ref().unwrap().outputs.clone(),
status_code: JobStatus::JobSkipped as i32,
status_name: JobStatus::JobSkipped.to_display_string(),
message: "Job skipped - all target partitions already available".to_string(),
config: task_node.config.clone(),
manifests: vec![],
})
);
if let Err(e) = query_engine.append_event(job_event).await {
error!("Failed to log job skipped event: {}", e);
}
}
} else {
// Task delegated to active build
info!("Task {} delegated to active build request", task_node.job.as_ref().unwrap().label);
}
task_states.insert(task_key.clone(), TaskState::Succeeded);
// Mark outputs as completed
for output_ref in &task_node.config.as_ref().unwrap().outputs {
completed_outputs.insert(output_ref.str.clone());
}
continue;
}
info!("Dispatching task: {}", task_node.job.as_ref().unwrap().label);
// Log job scheduling events
if let Some(ref query_engine) = build_event_log {
let job_run_id = Uuid::new_v4().to_string();
// Log job scheduled
let job_event = create_build_event(
build_request_id.clone(),
EventType::JobEvent(JobEvent {
job_run_id: job_run_id.clone(),
job_label: task_node.job.clone(),
target_partitions: task_node.config.as_ref().unwrap().outputs.clone(),
status_code: JobStatus::JobScheduled as i32,
status_name: JobStatus::JobScheduled.to_display_string(),
message: "Job scheduled for execution".to_string(),
config: task_node.config.clone(),
manifests: vec![],
})
);
if let Err(e) = query_engine.append_event(job_event).await {
error!("Failed to log job scheduled event: {}", e);
}
// Log partition building status
for output_ref in &task_node.config.as_ref().unwrap().outputs {
let partition_event = create_build_event(
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status_code: PartitionStatus::PartitionBuilding as i32,
status_name: PartitionStatus::PartitionBuilding.to_display_string(),
message: "Partition build started".to_string(),
job_run_id: job_run_id.clone(),
})
);
if let Err(e) = query_engine.append_event(partition_event).await {
error!("Failed to log partition building event: {}", e);
}
}
}
task_states.insert(task_key.clone(), TaskState::Running);
task_tx.send(task_node.clone())?;
active_tasks_count += 1;
}
}
}
}
// 4. Periodic logging
if last_log_time.elapsed() >= LOG_INTERVAL {
log_status_summary(&task_states, &original_tasks_by_key);
// Debug: Check for deadlock (pending tasks with no running tasks)
let has_pending = task_states.values().any(|s| *s == TaskState::Pending);
if has_pending && active_tasks_count == 0 {
warn!("Potential deadlock detected: {} pending tasks with no running tasks",
task_states.values().filter(|s| **s == TaskState::Pending).count());
// Log details of pending tasks and their preconditions
for (key, state) in &task_states {
if *state == TaskState::Pending {
if let Some(task) = original_tasks_by_key.get(key) {
warn!("Pending task: {} ({})", task.job.as_ref().unwrap().label, key);
warn!(" Required inputs:");
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type_code == 1 { // MATERIALIZE = 1
let available = completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str);
warn!(" {} - {}", dep.partition_ref.as_ref().unwrap().str, if available { "AVAILABLE" } else { "MISSING" });
}
}
warn!(" Produces outputs:");
for output in &task.config.as_ref().unwrap().outputs {
warn!(" {}", output.str);
}
}
}
}
}
last_log_time = Instant::now();
}
// 5. Check completion
let all_done = task_states.values().all(|s| *s == TaskState::Succeeded || *s == TaskState::Failed);
if active_tasks_count == 0 && all_done {
info!("All tasks are in a terminal state and no tasks are active.");
break;
}
// Avoid busy-waiting if no events, give channels time
// Select would be better here, but for simplicity:
thread::sleep(Duration::from_millis(50));
}
info!("Shutting down workers...");
drop(task_tx); // Signal workers to stop by closing the task channel
for handle in worker_handles {
handle.join().expect("Failed to join worker thread");
}
info!("All workers finished.");
// Final processing of any remaining results (should be minimal if loop logic is correct)
while let Ok(result) = result_rx.try_recv() {
active_tasks_count -= 1; // Should be 0
info!(
"Received late result for task {}: Success: {}",
result.job_label, result.success
);
// Update state for completeness, though it might not affect overall outcome now
let current_state = if result.success { TaskState::Succeeded } else { TaskState::Failed };
task_states.insert(result.task_key.clone(), current_state);
job_results.push(result);
}
let success_count = job_results.iter().filter(|r| r.success).count();
let failure_count = job_results.len() - success_count;
info!("Execution complete: {} succeeded, {} failed", success_count, failure_count);
// Log final build request status (existing detailed event)
if let Some(ref query_engine) = build_event_log {
let final_status = if failure_count > 0 || fail_fast_triggered {
BuildRequestStatusCode::BuildRequestFailed
} else {
BuildRequestStatusCode::BuildRequestCompleted
};
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: Some(final_status.status()),
requested_partitions: graph.outputs.clone(),
message: format!("Execution completed: {} succeeded, {} failed", success_count, failure_count),
comment: None,
want_id: None,
})
);
if let Err(e) = query_engine.append_event(event).await {
error!("Failed to log final build request event: {}", e);
}
}
if failure_count > 0 || fail_fast_triggered {
error!("Execution finished with errors.");
std::process::exit(1);
}
Ok(())
}