package main import ( "encoding/json" "fmt" "io" "log" "os" "os/exec" "path/filepath" "strings" "sync" "time" ) // DataDepType represents the type of data dependency type DataDepType string const ( Query DataDepType = "query" Materialize DataDepType = "materialize" ) // DataDep represents a data dependency type DataDep struct { DepType DataDepType `json:"depType"` Ref string `json:"ref"` } // JobConfig represents the configuration for a job type JobConfig struct { Inputs []DataDep `json:"inputs"` Outputs []string `json:"outputs"` Args []string `json:"args"` Env map[string]string `json:"env"` } // Task represents a job task type Task struct { JobLabel string `json:"jobLabel"` Config JobConfig `json:"config"` } // getTaskKey generates a unique key for a task based on its JobLabel and input/output references func getTaskKey(task Task) string { // Start with the job label key := task.JobLabel // Add input references to the key for _, input := range task.Config.Inputs { key += "|input:" + input.Ref } // Add output references to the key for _, output := range task.Config.Outputs { key += "|output:" + output } return key } // JobGraph represents a graph of jobs type JobGraph struct { Outputs []string `json:"outputs"` Nodes []Task `json:"nodes"` } // JobResult represents the result of a job execution type JobResult struct { Task Task Success bool Error error StartTime time.Time EndTime time.Time StdOut string StdErr string } // TaskState represents the state of a task type TaskState string const ( TaskNotReady TaskState = "not_ready" TaskReady TaskState = "ready" TaskScheduled TaskState = "scheduled" TaskRunning TaskState = "running" TaskFailed TaskState = "failed" TaskSucceeded TaskState = "succeeded" ) // Executor manages the execution of jobs in the graph type Executor struct { graph *JobGraph completedJobs map[string]bool completedMutex sync.Mutex jobResults []JobResult resultsMutex sync.Mutex runningJobs int runningMutex sync.Mutex wg sync.WaitGroup failFast bool failedJob bool failedMutex sync.Mutex taskStates map[string]TaskState statesMutex sync.Mutex } // NewExecutor creates a new executor for the given job graph func NewExecutor(graph *JobGraph, failFast bool) *Executor { executor := &Executor{ graph: graph, completedJobs: make(map[string]bool), failFast: failFast, taskStates: make(map[string]TaskState), } // Initialize all tasks as not ready for _, task := range graph.Nodes { executor.taskStates[getTaskKey(task)] = TaskNotReady } return executor } // Execute runs all jobs in the graph func (e *Executor) Execute() ([]JobResult, error) { // Start the periodic logging of running jobs stopChan := make(chan struct{}) go e.logRunningJobsPeriodically(5*time.Second, stopChan) // Create a channel for tasks to be executed taskChan := make(chan Task, len(e.graph.Nodes)) // Create a channel for signaling when to check for new tasks newTasksChan := make(chan struct{}, len(e.graph.Nodes)) // Start worker goroutines numWorkers := 4 // Can be made configurable for i := 0; i < numWorkers; i++ { go e.worker(taskChan, newTasksChan) } // Schedule initial tasks (those with no dependencies or with all dependencies satisfied) e.scheduleReadyTasks(taskChan) // Process new tasks signals until all tasks are complete done := false for !done { select { case <-newTasksChan: // Schedule any new ready tasks e.scheduleReadyTasks(taskChan) default: // Check if all tasks are done allDone := true // Check if there are any running tasks e.runningMutex.Lock() if e.runningJobs > 0 { allDone = false } e.runningMutex.Unlock() // Check if all tasks are in a terminal state (succeeded or failed) if allDone { e.statesMutex.Lock() for _, state := range e.taskStates { if state != TaskSucceeded && state != TaskFailed { allDone = false break } } e.statesMutex.Unlock() } if allDone { done = true } else { // Sleep a bit to avoid busy waiting time.Sleep(100 * time.Millisecond) } } } close(taskChan) // Stop the periodic logging stopChan <- struct{}{} // Check if any job failed e.failedMutex.Lock() failed := e.failedJob e.failedMutex.Unlock() if failed && e.failFast { return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled") } return e.jobResults, nil } // scheduleReadyTasks schedules all tasks that are ready to be executed func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) { // Check if any task has failed and fail-fast is enabled e.failedMutex.Lock() if e.failedJob && e.failFast { e.failedMutex.Unlock() return } e.failedMutex.Unlock() for _, task := range e.graph.Nodes { if e.isTaskReady(task) { // Update task state to ready taskKey := getTaskKey(task) e.statesMutex.Lock() currentState := e.taskStates[taskKey] if currentState == TaskNotReady { e.taskStates[taskKey] = TaskReady } e.statesMutex.Unlock() // Update task state to scheduled and add to execution queue e.statesMutex.Lock() if e.taskStates[taskKey] == TaskReady { e.taskStates[taskKey] = TaskScheduled e.statesMutex.Unlock() e.wg.Add(1) taskChan <- task } else { e.statesMutex.Unlock() } } } } // isTaskReady checks if a task is ready to be executed func (e *Executor) isTaskReady(task Task) bool { // Check if the task has already been completed taskKey := getTaskKey(task) e.completedMutex.Lock() if e.completedJobs[taskKey] { e.completedMutex.Unlock() return false } e.completedMutex.Unlock() // Check if all dependencies are satisfied for _, input := range task.Config.Inputs { if input.DepType == Materialize { e.completedMutex.Lock() if !e.completedJobs[input.Ref] { e.completedMutex.Unlock() return false } e.completedMutex.Unlock() } } return true } // worker processes tasks from the task channel func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) { for task := range taskChan { // Check if we should continue execution if e.failFast { e.failedMutex.Lock() if e.failedJob { e.failedMutex.Unlock() e.wg.Done() continue } e.failedMutex.Unlock() } // Update task state to running e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskRunning e.statesMutex.Unlock() // Execute the task e.executeTask(task) // Signal that a task has completed and new tasks might be ready newTasksChan <- struct{}{} } } // executeTask executes a single task func (e *Executor) executeTask(task Task) { // Increment running jobs counter e.runningMutex.Lock() e.runningJobs++ e.runningMutex.Unlock() // Log start of job log.Printf("Starting job: %s", task.JobLabel) // Record start time startTime := time.Now() // Prepare for capturing stdout and stderr var stdoutBuf, stderrBuf strings.Builder // Execute the job // Resolve the executable from runfiles execPath := resolveExecutableFromRunfiles(task.JobLabel) cmd := exec.Command(execPath) // Set environment variables (only system environment, not job-specific) cmd.Env = os.Environ() // Convert job config to JSON for stdin configJSON, err := json.Marshal(task.Config) if err != nil { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return } // Set up stdin pipe stdin, err := cmd.StdinPipe() if err != nil { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return } // Capture stdout and stderr stdout, err := cmd.StdoutPipe() if err != nil { stdin.Close() e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return } stderr, err := cmd.StderrPipe() if err != nil { stdin.Close() e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return } // Start the command if err := cmd.Start(); err != nil { stdin.Close() e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() // Log the error and return log.Printf("Error starting command: %v", err) return } // Write config JSON to stdin and close it _, err = stdin.Write(configJSON) stdin.Close() if err != nil { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() // Log the error and return log.Printf("Error writing config JSON to stdin: %v", err) return } // Copy stdout and stderr to buffers go io.Copy(&stdoutBuf, stdout) go io.Copy(&stderrBuf, stderr) // Wait for the command to complete err = cmd.Wait() endTime := time.Now() // Record the result success := err == nil e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String()) // Log completion if success { log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime)) // Update task state to succeeded e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskSucceeded e.statesMutex.Unlock() } else { log.Printf("Job failed: %s (duration: %v) Error: %v\nStderr: %s", task.JobLabel, endTime.Sub(startTime), err, stderrBuf.String()) // Update task state to failed e.statesMutex.Lock() e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() } // Decrement running jobs counter e.runningMutex.Lock() e.runningJobs-- e.runningMutex.Unlock() // Mark the job as completed e.completedMutex.Lock() e.completedJobs[getTaskKey(task)] = true for _, output := range task.Config.Outputs { e.completedJobs[output] = true } e.completedMutex.Unlock() // Mark as failed if needed if !success { e.failedMutex.Lock() e.failedJob = true e.failedMutex.Unlock() // Log the failed task log.Printf("Task failed: %s. Not scheduling any more tasks. Stderr: %s", task.JobLabel, stderrBuf.String()) } e.wg.Done() } // resolveExecutableFromRunfiles resolves the executable path from runfiles func resolveExecutableFromRunfiles(jobLabel string) string { // Get the runfiles directory from the environment runfilesDir := os.Getenv("RUNFILES_DIR") if runfilesDir == "" { // If RUNFILES_DIR is not set, try to find it from the executable path execPath, err := os.Executable() if err == nil { runfilesDir = execPath + ".runfiles" } } // If we still don't have a runfiles directory, fall back to the old behavior if runfilesDir == "" { log.Printf("Warning: RUNFILES_DIR not found, falling back to direct executable path") return jobLabel + ".exec" } // Construct the path to the executable in the runfiles directory // The executable should be in the _main directory with the job label name + ".exec" // Extract the target name from the job label (which might be in the format "//package:target") targetName := jobLabel if colonIndex := strings.LastIndex(jobLabel, ":"); colonIndex != -1 { targetName = jobLabel[colonIndex+1:] } else { // If there's no colon, it might be a path like "//package/target" targetName = filepath.Base(jobLabel) } execName := targetName + ".exec" execPath := filepath.Join(runfilesDir, "_main", execName) log.Printf("Resolved executable path: %s", execPath) return execPath } // recordJobResult records the result of a job execution func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) { result := JobResult{ Task: task, Success: success, Error: err, StartTime: startTime, EndTime: endTime, StdOut: stdout, StdErr: stderr, } e.resultsMutex.Lock() e.jobResults = append(e.jobResults, result) e.resultsMutex.Unlock() } // logRunningJobsPeriodically logs the tasks in each state periodically func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: // Count tasks in each state notReadyTasks := []string{} readyTasks := []string{} scheduledOrRunningTasks := []string{} failedTasks := []string{} succeededTasks := []string{} e.statesMutex.Lock() for taskKey, state := range e.taskStates { // Extract the job label from the task key for display jobLabel := taskKey if idx := strings.Index(taskKey, "|"); idx > 0 { jobLabel = taskKey[:idx] } switch state { case TaskNotReady: notReadyTasks = append(notReadyTasks, jobLabel) case TaskReady: readyTasks = append(readyTasks, jobLabel) case TaskScheduled, TaskRunning: scheduledOrRunningTasks = append(scheduledOrRunningTasks, jobLabel) case TaskFailed: failedTasks = append(failedTasks, jobLabel) case TaskSucceeded: succeededTasks = append(succeededTasks, jobLabel) } } e.statesMutex.Unlock() // Log the counts and task labels for each state log.Printf("Task Status Summary:") log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks) log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks) log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks) log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks) log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks) case <-stopChan: return } } } func main() { // Read the job graph from stdin var graph JobGraph decoder := json.NewDecoder(os.Stdin) if err := decoder.Decode(&graph); err != nil { log.Fatalf("Error decoding job graph: %v", err) } log.Printf("Executing job graph with %d nodes", len(graph.Nodes)) // Create an executor with fail-fast enabled executor := NewExecutor(&graph, true) // Execute the graph results, err := executor.Execute() // Log the results successCount := 0 failureCount := 0 for _, result := range results { if result.Success { successCount++ } else { failureCount++ } } log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount) if err != nil { log.Fatalf("Execution failed: %v", err) } if failureCount > 0 { os.Exit(1) } }