586 lines
15 KiB
Go
586 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// DataDepType represents the type of data dependency
|
|
type DataDepType string
|
|
|
|
const (
|
|
Query DataDepType = "query"
|
|
Materialize DataDepType = "materialize"
|
|
)
|
|
|
|
// DataDep represents a data dependency
|
|
type DataDep struct {
|
|
DepType DataDepType `json:"depType"`
|
|
Ref string `json:"ref"`
|
|
}
|
|
|
|
// JobConfig represents the configuration for a job
|
|
type JobConfig struct {
|
|
Inputs []DataDep `json:"inputs"`
|
|
Outputs []string `json:"outputs"`
|
|
Args []string `json:"args"`
|
|
Env map[string]string `json:"env"`
|
|
}
|
|
|
|
// Task represents a job task
|
|
type Task struct {
|
|
JobLabel string `json:"jobLabel"`
|
|
Config JobConfig `json:"config"`
|
|
}
|
|
|
|
// getTaskKey generates a unique key for a task based on its JobLabel and input/output references
|
|
func getTaskKey(task Task) string {
|
|
// Start with the job label
|
|
key := task.JobLabel
|
|
|
|
// Add input references to the key
|
|
for _, input := range task.Config.Inputs {
|
|
key += "|input:" + input.Ref
|
|
}
|
|
|
|
// Add output references to the key
|
|
for _, output := range task.Config.Outputs {
|
|
key += "|output:" + output
|
|
}
|
|
|
|
return key
|
|
}
|
|
|
|
// JobGraph represents a graph of jobs
|
|
type JobGraph struct {
|
|
Outputs []string `json:"outputs"`
|
|
Nodes []Task `json:"nodes"`
|
|
}
|
|
|
|
// JobResult represents the result of a job execution
|
|
type JobResult struct {
|
|
Task Task
|
|
Success bool
|
|
Error error
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
StdOut string
|
|
StdErr string
|
|
}
|
|
|
|
// TaskState represents the state of a task
|
|
type TaskState string
|
|
|
|
const (
|
|
TaskNotReady TaskState = "not_ready"
|
|
TaskReady TaskState = "ready"
|
|
TaskScheduled TaskState = "scheduled"
|
|
TaskRunning TaskState = "running"
|
|
TaskFailed TaskState = "failed"
|
|
TaskSucceeded TaskState = "succeeded"
|
|
)
|
|
|
|
// Executor manages the execution of jobs in the graph
|
|
type Executor struct {
|
|
graph *JobGraph
|
|
completedJobs map[string]bool
|
|
completedMutex sync.Mutex
|
|
jobResults []JobResult
|
|
resultsMutex sync.Mutex
|
|
runningJobs int
|
|
runningMutex sync.Mutex
|
|
wg sync.WaitGroup
|
|
failFast bool
|
|
failedJob bool
|
|
failedMutex sync.Mutex
|
|
taskStates map[string]TaskState
|
|
statesMutex sync.Mutex
|
|
}
|
|
|
|
// NewExecutor creates a new executor for the given job graph
|
|
func NewExecutor(graph *JobGraph, failFast bool) *Executor {
|
|
executor := &Executor{
|
|
graph: graph,
|
|
completedJobs: make(map[string]bool),
|
|
failFast: failFast,
|
|
taskStates: make(map[string]TaskState),
|
|
}
|
|
|
|
// Initialize all tasks as not ready
|
|
for _, task := range graph.Nodes {
|
|
executor.taskStates[getTaskKey(task)] = TaskNotReady
|
|
}
|
|
|
|
return executor
|
|
}
|
|
|
|
// Execute runs all jobs in the graph
|
|
func (e *Executor) Execute() ([]JobResult, error) {
|
|
// Start the periodic logging of running jobs
|
|
stopChan := make(chan struct{})
|
|
go e.logRunningJobsPeriodically(5*time.Second, stopChan)
|
|
|
|
// Create a channel for tasks to be executed
|
|
taskChan := make(chan Task, len(e.graph.Nodes))
|
|
|
|
// Create a channel for signaling when to check for new tasks
|
|
newTasksChan := make(chan struct{}, len(e.graph.Nodes))
|
|
|
|
// Start worker goroutines
|
|
numWorkers := 4 // Can be made configurable
|
|
for i := 0; i < numWorkers; i++ {
|
|
go e.worker(taskChan, newTasksChan)
|
|
}
|
|
|
|
// Schedule initial tasks (those with no dependencies or with all dependencies satisfied)
|
|
e.scheduleReadyTasks(taskChan)
|
|
|
|
// Process new tasks signals until all tasks are complete
|
|
done := false
|
|
for !done {
|
|
select {
|
|
case <-newTasksChan:
|
|
// Schedule any new ready tasks
|
|
e.scheduleReadyTasks(taskChan)
|
|
default:
|
|
// Check if all tasks are done
|
|
allDone := true
|
|
|
|
// Check if there are any running tasks
|
|
e.runningMutex.Lock()
|
|
if e.runningJobs > 0 {
|
|
allDone = false
|
|
}
|
|
e.runningMutex.Unlock()
|
|
|
|
// Check if all tasks are in a terminal state (succeeded or failed)
|
|
if allDone {
|
|
e.statesMutex.Lock()
|
|
for _, state := range e.taskStates {
|
|
if state != TaskSucceeded && state != TaskFailed {
|
|
allDone = false
|
|
break
|
|
}
|
|
}
|
|
e.statesMutex.Unlock()
|
|
}
|
|
|
|
if allDone {
|
|
done = true
|
|
} else {
|
|
// Sleep a bit to avoid busy waiting
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
close(taskChan)
|
|
|
|
// Stop the periodic logging
|
|
stopChan <- struct{}{}
|
|
|
|
// Check if any job failed
|
|
e.failedMutex.Lock()
|
|
failed := e.failedJob
|
|
e.failedMutex.Unlock()
|
|
|
|
if failed && e.failFast {
|
|
return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled")
|
|
}
|
|
|
|
return e.jobResults, nil
|
|
}
|
|
|
|
// scheduleReadyTasks schedules all tasks that are ready to be executed
|
|
func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) {
|
|
// Check if any task has failed and fail-fast is enabled
|
|
e.failedMutex.Lock()
|
|
if e.failedJob && e.failFast {
|
|
e.failedMutex.Unlock()
|
|
return
|
|
}
|
|
e.failedMutex.Unlock()
|
|
|
|
for _, task := range e.graph.Nodes {
|
|
if e.isTaskReady(task) {
|
|
// Update task state to ready
|
|
taskKey := getTaskKey(task)
|
|
e.statesMutex.Lock()
|
|
currentState := e.taskStates[taskKey]
|
|
if currentState == TaskNotReady {
|
|
e.taskStates[taskKey] = TaskReady
|
|
}
|
|
e.statesMutex.Unlock()
|
|
|
|
// Update task state to scheduled and add to execution queue
|
|
e.statesMutex.Lock()
|
|
if e.taskStates[taskKey] == TaskReady {
|
|
e.taskStates[taskKey] = TaskScheduled
|
|
e.statesMutex.Unlock()
|
|
|
|
e.wg.Add(1)
|
|
taskChan <- task
|
|
} else {
|
|
e.statesMutex.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// isTaskReady checks if a task is ready to be executed
|
|
func (e *Executor) isTaskReady(task Task) bool {
|
|
// Check if the task has already been completed
|
|
taskKey := getTaskKey(task)
|
|
e.completedMutex.Lock()
|
|
if e.completedJobs[taskKey] {
|
|
e.completedMutex.Unlock()
|
|
return false
|
|
}
|
|
e.completedMutex.Unlock()
|
|
|
|
// Check if all dependencies are satisfied
|
|
for _, input := range task.Config.Inputs {
|
|
if input.DepType == Materialize {
|
|
e.completedMutex.Lock()
|
|
if !e.completedJobs[input.Ref] {
|
|
e.completedMutex.Unlock()
|
|
return false
|
|
}
|
|
e.completedMutex.Unlock()
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// worker processes tasks from the task channel
|
|
func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) {
|
|
for task := range taskChan {
|
|
// Check if we should continue execution
|
|
if e.failFast {
|
|
e.failedMutex.Lock()
|
|
if e.failedJob {
|
|
e.failedMutex.Unlock()
|
|
e.wg.Done()
|
|
continue
|
|
}
|
|
e.failedMutex.Unlock()
|
|
}
|
|
|
|
// Update task state to running
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskRunning
|
|
e.statesMutex.Unlock()
|
|
|
|
// Execute the task
|
|
e.executeTask(task)
|
|
|
|
// Signal that a task has completed and new tasks might be ready
|
|
newTasksChan <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// executeTask executes a single task
|
|
func (e *Executor) executeTask(task Task) {
|
|
// Increment running jobs counter
|
|
e.runningMutex.Lock()
|
|
e.runningJobs++
|
|
e.runningMutex.Unlock()
|
|
|
|
// Log start of job
|
|
log.Printf("Starting job: %s", task.JobLabel)
|
|
|
|
// Record start time
|
|
startTime := time.Now()
|
|
|
|
// Prepare for capturing stdout and stderr
|
|
var stdoutBuf, stderrBuf strings.Builder
|
|
|
|
// Execute the job
|
|
// Resolve the executable from runfiles
|
|
execPath := resolveExecutableFromRunfiles(task.JobLabel)
|
|
cmd := exec.Command(execPath)
|
|
|
|
// Set environment variables (only system environment, not job-specific)
|
|
cmd.Env = os.Environ()
|
|
|
|
// Convert job config to JSON for stdin
|
|
configJSON, err := json.Marshal(task.Config)
|
|
if err != nil {
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
return
|
|
}
|
|
|
|
// Set up stdin pipe
|
|
stdin, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
return
|
|
}
|
|
|
|
// Capture stdout and stderr
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
stdin.Close()
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
return
|
|
}
|
|
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
stdin.Close()
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
return
|
|
}
|
|
|
|
// Start the command
|
|
if err := cmd.Start(); err != nil {
|
|
stdin.Close()
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
// Log the error and return
|
|
log.Printf("Error starting command: %v", err)
|
|
return
|
|
}
|
|
|
|
// Write config JSON to stdin and close it
|
|
_, err = stdin.Write(configJSON)
|
|
stdin.Close()
|
|
if err != nil {
|
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
e.wg.Done()
|
|
// Log the error and return
|
|
log.Printf("Error writing config JSON to stdin: %v", err)
|
|
return
|
|
}
|
|
|
|
// Copy stdout and stderr to buffers
|
|
go io.Copy(&stdoutBuf, stdout)
|
|
go io.Copy(&stderrBuf, stderr)
|
|
|
|
// Wait for the command to complete
|
|
err = cmd.Wait()
|
|
endTime := time.Now()
|
|
|
|
// Record the result
|
|
success := err == nil
|
|
e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String())
|
|
|
|
// Log completion
|
|
if success {
|
|
log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime))
|
|
// Update task state to succeeded
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskSucceeded
|
|
e.statesMutex.Unlock()
|
|
} else {
|
|
log.Printf("Job failed: %s (duration: %v) Error: %v\nStderr: %s", task.JobLabel, endTime.Sub(startTime), err, stderrBuf.String())
|
|
// Update task state to failed
|
|
e.statesMutex.Lock()
|
|
e.taskStates[getTaskKey(task)] = TaskFailed
|
|
e.statesMutex.Unlock()
|
|
}
|
|
|
|
// Decrement running jobs counter
|
|
e.runningMutex.Lock()
|
|
e.runningJobs--
|
|
e.runningMutex.Unlock()
|
|
|
|
// Mark the job as completed
|
|
e.completedMutex.Lock()
|
|
e.completedJobs[getTaskKey(task)] = true
|
|
for _, output := range task.Config.Outputs {
|
|
e.completedJobs[output] = true
|
|
}
|
|
e.completedMutex.Unlock()
|
|
|
|
// Mark as failed if needed
|
|
if !success {
|
|
e.failedMutex.Lock()
|
|
e.failedJob = true
|
|
e.failedMutex.Unlock()
|
|
|
|
// Log the failed task
|
|
log.Printf("Task failed: %s. Not scheduling any more tasks. Stderr: %s", task.JobLabel, stderrBuf.String())
|
|
}
|
|
|
|
e.wg.Done()
|
|
}
|
|
|
|
// resolveExecutableFromRunfiles resolves the executable path from runfiles
|
|
func resolveExecutableFromRunfiles(jobLabel string) string {
|
|
// Get the runfiles directory from the environment
|
|
runfilesDir := os.Getenv("RUNFILES_DIR")
|
|
if runfilesDir == "" {
|
|
// If RUNFILES_DIR is not set, try to find it from the executable path
|
|
execPath, err := os.Executable()
|
|
if err == nil {
|
|
runfilesDir = execPath + ".runfiles"
|
|
}
|
|
}
|
|
|
|
// If we still don't have a runfiles directory, fall back to the old behavior
|
|
if runfilesDir == "" {
|
|
log.Printf("Warning: RUNFILES_DIR not found, falling back to direct executable path")
|
|
return jobLabel + ".exec"
|
|
}
|
|
|
|
// Construct the path to the executable in the runfiles directory
|
|
// The executable should be in the _main directory with the job label name + ".exec"
|
|
// Extract the target name from the job label (which might be in the format "//package:target")
|
|
targetName := jobLabel
|
|
if colonIndex := strings.LastIndex(jobLabel, ":"); colonIndex != -1 {
|
|
targetName = jobLabel[colonIndex+1:]
|
|
} else {
|
|
// If there's no colon, it might be a path like "//package/target"
|
|
targetName = filepath.Base(jobLabel)
|
|
}
|
|
|
|
execName := targetName + ".exec"
|
|
execPath := filepath.Join(runfilesDir, "_main", execName)
|
|
|
|
log.Printf("Resolved executable path: %s", execPath)
|
|
return execPath
|
|
}
|
|
|
|
// recordJobResult records the result of a job execution
|
|
func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) {
|
|
result := JobResult{
|
|
Task: task,
|
|
Success: success,
|
|
Error: err,
|
|
StartTime: startTime,
|
|
EndTime: endTime,
|
|
StdOut: stdout,
|
|
StdErr: stderr,
|
|
}
|
|
|
|
e.resultsMutex.Lock()
|
|
e.jobResults = append(e.jobResults, result)
|
|
e.resultsMutex.Unlock()
|
|
}
|
|
|
|
// logRunningJobsPeriodically logs the tasks in each state periodically
|
|
func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Count tasks in each state
|
|
notReadyTasks := []string{}
|
|
readyTasks := []string{}
|
|
scheduledOrRunningTasks := []string{}
|
|
failedTasks := []string{}
|
|
succeededTasks := []string{}
|
|
|
|
e.statesMutex.Lock()
|
|
for taskKey, state := range e.taskStates {
|
|
// Extract the job label from the task key for display
|
|
jobLabel := taskKey
|
|
if idx := strings.Index(taskKey, "|"); idx > 0 {
|
|
jobLabel = taskKey[:idx]
|
|
}
|
|
|
|
switch state {
|
|
case TaskNotReady:
|
|
notReadyTasks = append(notReadyTasks, jobLabel)
|
|
case TaskReady:
|
|
readyTasks = append(readyTasks, jobLabel)
|
|
case TaskScheduled, TaskRunning:
|
|
scheduledOrRunningTasks = append(scheduledOrRunningTasks, jobLabel)
|
|
case TaskFailed:
|
|
failedTasks = append(failedTasks, jobLabel)
|
|
case TaskSucceeded:
|
|
succeededTasks = append(succeededTasks, jobLabel)
|
|
}
|
|
}
|
|
e.statesMutex.Unlock()
|
|
|
|
// Log the counts and task labels for each state
|
|
log.Printf("Task Status Summary:")
|
|
log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks)
|
|
log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks)
|
|
log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks)
|
|
log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks)
|
|
log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks)
|
|
case <-stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
// Read the job graph from stdin
|
|
var graph JobGraph
|
|
decoder := json.NewDecoder(os.Stdin)
|
|
if err := decoder.Decode(&graph); err != nil {
|
|
log.Fatalf("Error decoding job graph: %v", err)
|
|
}
|
|
|
|
log.Printf("Executing job graph with %d nodes", len(graph.Nodes))
|
|
|
|
// Create an executor with fail-fast enabled
|
|
executor := NewExecutor(&graph, true)
|
|
|
|
// Execute the graph
|
|
results, err := executor.Execute()
|
|
|
|
// Log the results
|
|
successCount := 0
|
|
failureCount := 0
|
|
for _, result := range results {
|
|
if result.Success {
|
|
successCount++
|
|
} else {
|
|
failureCount++
|
|
}
|
|
}
|
|
|
|
log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount)
|
|
|
|
if err != nil {
|
|
log.Fatalf("Execution failed: %v", err)
|
|
}
|
|
|
|
if failureCount > 0 {
|
|
os.Exit(1)
|
|
}
|
|
}
|