From 2b2c136c67fc7d15fb2a13c5239dc381d7af978b Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Wed, 7 May 2025 20:14:13 -0700 Subject: [PATCH] Fully replace go execute --- databuild/graph/BUILD.bazel | 11 +- databuild/graph/execute.go | 586 ------------------ ...per.sh.tpl => rust_execute_wrapper.sh.tpl} | 2 +- databuild/rules.bzl | 2 +- 4 files changed, 4 insertions(+), 597 deletions(-) delete mode 100644 databuild/graph/execute.go rename databuild/graph/{go_exec_wrapper.sh.tpl => rust_execute_wrapper.sh.tpl} (80%) diff --git a/databuild/graph/BUILD.bazel b/databuild/graph/BUILD.bazel index bb500b0..1e229fa 100644 --- a/databuild/graph/BUILD.bazel +++ b/databuild/graph/BUILD.bazel @@ -2,19 +2,12 @@ load("@rules_go//go:def.bzl", "go_binary") load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library") exports_files([ - "go_analyze_wrapper.sh.tpl", - "go_exec_wrapper.sh.tpl", "rust_analyze_wrapper.sh.tpl", + "rust_execute_wrapper.sh.tpl", ]) -go_binary( - name = "execute", - srcs = ["execute.go"], - visibility = ["//visibility:public"], -) - rust_binary( - name = "execute_rs", + name = "execute", srcs = ["execute.rs"], edition = "2021", deps = [ diff --git a/databuild/graph/execute.go b/databuild/graph/execute.go deleted file mode 100644 index 640f95d..0000000 --- a/databuild/graph/execute.go +++ /dev/null @@ -1,586 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "io" - "log" - "os" - "os/exec" - "path/filepath" - "strings" - "sync" - "time" -) - -// DataDepType represents the type of data dependency -type DataDepType string - -const ( - Query DataDepType = "query" - Materialize DataDepType = "materialize" -) - -// DataDep represents a data dependency -type DataDep struct { - DepType DataDepType `json:"depType"` - Ref string `json:"ref"` -} - -// JobConfig represents the configuration for a job -type JobConfig struct { - Inputs []DataDep `json:"inputs"` - Outputs []string `json:"outputs"` - Args []string `json:"args"` - Env map[string]string `json:"env"` -} - -// Task represents a job task -type Task struct { - JobLabel string `json:"jobLabel"` - Config JobConfig `json:"config"` -} - -// getTaskKey generates a unique key for a task based on its JobLabel and input/output references -func getTaskKey(task Task) string { - // Start with the job label - key := task.JobLabel - - // Add input references to the key - for _, input := range task.Config.Inputs { - key += "|input:" + input.Ref - } - - // Add output references to the key - for _, output := range task.Config.Outputs { - key += "|output:" + output - } - - return key -} - -// JobGraph represents a graph of jobs -type JobGraph struct { - Outputs []string `json:"outputs"` - Nodes []Task `json:"nodes"` -} - -// JobResult represents the result of a job execution -type JobResult struct { - Task Task - Success bool - Error error - StartTime time.Time - EndTime time.Time - StdOut string - StdErr string -} - -// TaskState represents the state of a task -type TaskState string - -const ( - TaskNotReady TaskState = "not_ready" - TaskReady TaskState = "ready" - TaskScheduled TaskState = "scheduled" - TaskRunning TaskState = "running" - TaskFailed TaskState = "failed" - TaskSucceeded TaskState = "succeeded" -) - -// Executor manages the execution of jobs in the graph -type Executor struct { - graph *JobGraph - completedJobs map[string]bool - completedMutex sync.Mutex - jobResults []JobResult - resultsMutex sync.Mutex - runningJobs int - runningMutex sync.Mutex - wg sync.WaitGroup - failFast bool - failedJob bool - failedMutex sync.Mutex - taskStates map[string]TaskState - statesMutex sync.Mutex -} - -// NewExecutor creates a new executor for the given job graph -func NewExecutor(graph *JobGraph, failFast bool) *Executor { - executor := &Executor{ - graph: graph, - completedJobs: make(map[string]bool), - failFast: failFast, - taskStates: make(map[string]TaskState), - } - - // Initialize all tasks as not ready - for _, task := range graph.Nodes { - executor.taskStates[getTaskKey(task)] = TaskNotReady - } - - return executor -} - -// Execute runs all jobs in the graph -func (e *Executor) Execute() ([]JobResult, error) { - // Start the periodic logging of running jobs - stopChan := make(chan struct{}) - go e.logRunningJobsPeriodically(5*time.Second, stopChan) - - // Create a channel for tasks to be executed - taskChan := make(chan Task, len(e.graph.Nodes)) - - // Create a channel for signaling when to check for new tasks - newTasksChan := make(chan struct{}, len(e.graph.Nodes)) - - // Start worker goroutines - numWorkers := 4 // Can be made configurable - for i := 0; i < numWorkers; i++ { - go e.worker(taskChan, newTasksChan) - } - - // Schedule initial tasks (those with no dependencies or with all dependencies satisfied) - e.scheduleReadyTasks(taskChan) - - // Process new tasks signals until all tasks are complete - done := false - for !done { - select { - case <-newTasksChan: - // Schedule any new ready tasks - e.scheduleReadyTasks(taskChan) - default: - // Check if all tasks are done - allDone := true - - // Check if there are any running tasks - e.runningMutex.Lock() - if e.runningJobs > 0 { - allDone = false - } - e.runningMutex.Unlock() - - // Check if all tasks are in a terminal state (succeeded or failed) - if allDone { - e.statesMutex.Lock() - for _, state := range e.taskStates { - if state != TaskSucceeded && state != TaskFailed { - allDone = false - break - } - } - e.statesMutex.Unlock() - } - - if allDone { - done = true - } else { - // Sleep a bit to avoid busy waiting - time.Sleep(100 * time.Millisecond) - } - } - } - - close(taskChan) - - // Stop the periodic logging - stopChan <- struct{}{} - - // Check if any job failed - e.failedMutex.Lock() - failed := e.failedJob - e.failedMutex.Unlock() - - if failed && e.failFast { - return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled") - } - - return e.jobResults, nil -} - -// scheduleReadyTasks schedules all tasks that are ready to be executed -func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) { - // Check if any task has failed and fail-fast is enabled - e.failedMutex.Lock() - if e.failedJob && e.failFast { - e.failedMutex.Unlock() - return - } - e.failedMutex.Unlock() - - for _, task := range e.graph.Nodes { - if e.isTaskReady(task) { - // Update task state to ready - taskKey := getTaskKey(task) - e.statesMutex.Lock() - currentState := e.taskStates[taskKey] - if currentState == TaskNotReady { - e.taskStates[taskKey] = TaskReady - } - e.statesMutex.Unlock() - - // Update task state to scheduled and add to execution queue - e.statesMutex.Lock() - if e.taskStates[taskKey] == TaskReady { - e.taskStates[taskKey] = TaskScheduled - e.statesMutex.Unlock() - - e.wg.Add(1) - taskChan <- task - } else { - e.statesMutex.Unlock() - } - } - } -} - -// isTaskReady checks if a task is ready to be executed -func (e *Executor) isTaskReady(task Task) bool { - // Check if the task has already been completed - taskKey := getTaskKey(task) - e.completedMutex.Lock() - if e.completedJobs[taskKey] { - e.completedMutex.Unlock() - return false - } - e.completedMutex.Unlock() - - // Check if all dependencies are satisfied - for _, input := range task.Config.Inputs { - if input.DepType == Materialize { - e.completedMutex.Lock() - if !e.completedJobs[input.Ref] { - e.completedMutex.Unlock() - return false - } - e.completedMutex.Unlock() - } - } - - return true -} - -// worker processes tasks from the task channel -func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) { - for task := range taskChan { - // Check if we should continue execution - if e.failFast { - e.failedMutex.Lock() - if e.failedJob { - e.failedMutex.Unlock() - e.wg.Done() - continue - } - e.failedMutex.Unlock() - } - - // Update task state to running - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskRunning - e.statesMutex.Unlock() - - // Execute the task - e.executeTask(task) - - // Signal that a task has completed and new tasks might be ready - newTasksChan <- struct{}{} - } -} - -// executeTask executes a single task -func (e *Executor) executeTask(task Task) { - // Increment running jobs counter - e.runningMutex.Lock() - e.runningJobs++ - e.runningMutex.Unlock() - - // Log start of job - log.Printf("Starting job: %s", task.JobLabel) - - // Record start time - startTime := time.Now() - - // Prepare for capturing stdout and stderr - var stdoutBuf, stderrBuf strings.Builder - - // Execute the job - // Resolve the executable from runfiles - execPath := resolveExecutableFromRunfiles(task.JobLabel) - cmd := exec.Command(execPath) - - // Set environment variables (only system environment, not job-specific) - cmd.Env = os.Environ() - - // Convert job config to JSON for stdin - configJSON, err := json.Marshal(task.Config) - if err != nil { - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - return - } - - // Set up stdin pipe - stdin, err := cmd.StdinPipe() - if err != nil { - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - return - } - - // Capture stdout and stderr - stdout, err := cmd.StdoutPipe() - if err != nil { - stdin.Close() - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - return - } - - stderr, err := cmd.StderrPipe() - if err != nil { - stdin.Close() - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - return - } - - // Start the command - if err := cmd.Start(); err != nil { - stdin.Close() - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - // Log the error and return - log.Printf("Error starting command: %v", err) - return - } - - // Write config JSON to stdin and close it - _, err = stdin.Write(configJSON) - stdin.Close() - if err != nil { - e.recordJobResult(task, false, err, startTime, time.Now(), "", "") - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - e.wg.Done() - // Log the error and return - log.Printf("Error writing config JSON to stdin: %v", err) - return - } - - // Copy stdout and stderr to buffers - go io.Copy(&stdoutBuf, stdout) - go io.Copy(&stderrBuf, stderr) - - // Wait for the command to complete - err = cmd.Wait() - endTime := time.Now() - - // Record the result - success := err == nil - e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String()) - - // Log completion - if success { - log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime)) - // Update task state to succeeded - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskSucceeded - e.statesMutex.Unlock() - } else { - log.Printf("Job failed: %s (duration: %v) Error: %v\nStderr: %s", task.JobLabel, endTime.Sub(startTime), err, stderrBuf.String()) - // Update task state to failed - e.statesMutex.Lock() - e.taskStates[getTaskKey(task)] = TaskFailed - e.statesMutex.Unlock() - } - - // Decrement running jobs counter - e.runningMutex.Lock() - e.runningJobs-- - e.runningMutex.Unlock() - - // Mark the job as completed - e.completedMutex.Lock() - e.completedJobs[getTaskKey(task)] = true - for _, output := range task.Config.Outputs { - e.completedJobs[output] = true - } - e.completedMutex.Unlock() - - // Mark as failed if needed - if !success { - e.failedMutex.Lock() - e.failedJob = true - e.failedMutex.Unlock() - - // Log the failed task - log.Printf("Task failed: %s. Not scheduling any more tasks. Stderr: %s", task.JobLabel, stderrBuf.String()) - } - - e.wg.Done() -} - -// resolveExecutableFromRunfiles resolves the executable path from runfiles -func resolveExecutableFromRunfiles(jobLabel string) string { - // Get the runfiles directory from the environment - runfilesDir := os.Getenv("RUNFILES_DIR") - if runfilesDir == "" { - // If RUNFILES_DIR is not set, try to find it from the executable path - execPath, err := os.Executable() - if err == nil { - runfilesDir = execPath + ".runfiles" - } - } - - // If we still don't have a runfiles directory, fall back to the old behavior - if runfilesDir == "" { - log.Printf("Warning: RUNFILES_DIR not found, falling back to direct executable path") - return jobLabel + ".exec" - } - - // Construct the path to the executable in the runfiles directory - // The executable should be in the _main directory with the job label name + ".exec" - // Extract the target name from the job label (which might be in the format "//package:target") - targetName := jobLabel - if colonIndex := strings.LastIndex(jobLabel, ":"); colonIndex != -1 { - targetName = jobLabel[colonIndex+1:] - } else { - // If there's no colon, it might be a path like "//package/target" - targetName = filepath.Base(jobLabel) - } - - execName := targetName + ".exec" - execPath := filepath.Join(runfilesDir, "_main", execName) - - log.Printf("Resolved executable path: %s", execPath) - return execPath -} - -// recordJobResult records the result of a job execution -func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) { - result := JobResult{ - Task: task, - Success: success, - Error: err, - StartTime: startTime, - EndTime: endTime, - StdOut: stdout, - StdErr: stderr, - } - - e.resultsMutex.Lock() - e.jobResults = append(e.jobResults, result) - e.resultsMutex.Unlock() -} - -// logRunningJobsPeriodically logs the tasks in each state periodically -func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // Count tasks in each state - notReadyTasks := []string{} - readyTasks := []string{} - scheduledOrRunningTasks := []string{} - failedTasks := []string{} - succeededTasks := []string{} - - e.statesMutex.Lock() - for taskKey, state := range e.taskStates { - // Extract the job label from the task key for display - jobLabel := taskKey - if idx := strings.Index(taskKey, "|"); idx > 0 { - jobLabel = taskKey[:idx] - } - - switch state { - case TaskNotReady: - notReadyTasks = append(notReadyTasks, jobLabel) - case TaskReady: - readyTasks = append(readyTasks, jobLabel) - case TaskScheduled, TaskRunning: - scheduledOrRunningTasks = append(scheduledOrRunningTasks, jobLabel) - case TaskFailed: - failedTasks = append(failedTasks, jobLabel) - case TaskSucceeded: - succeededTasks = append(succeededTasks, jobLabel) - } - } - e.statesMutex.Unlock() - - // Log the counts and task labels for each state - log.Printf("Task Status Summary:") - log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks) - log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks) - log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks) - log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks) - log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks) - case <-stopChan: - return - } - } -} - -func main() { - // Read the job graph from stdin - var graph JobGraph - decoder := json.NewDecoder(os.Stdin) - if err := decoder.Decode(&graph); err != nil { - log.Fatalf("Error decoding job graph: %v", err) - } - - log.Printf("Executing job graph with %d nodes", len(graph.Nodes)) - - // Create an executor with fail-fast enabled - executor := NewExecutor(&graph, true) - - // Execute the graph - results, err := executor.Execute() - - // Log the results - successCount := 0 - failureCount := 0 - for _, result := range results { - if result.Success { - successCount++ - } else { - failureCount++ - } - } - - log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount) - - if err != nil { - log.Fatalf("Execution failed: %v", err) - } - - if failureCount > 0 { - os.Exit(1) - } -} diff --git a/databuild/graph/go_exec_wrapper.sh.tpl b/databuild/graph/rust_execute_wrapper.sh.tpl similarity index 80% rename from databuild/graph/go_exec_wrapper.sh.tpl rename to databuild/graph/rust_execute_wrapper.sh.tpl index b8bee3d..2c23098 100644 --- a/databuild/graph/go_exec_wrapper.sh.tpl +++ b/databuild/graph/rust_execute_wrapper.sh.tpl @@ -5,7 +5,7 @@ set -e %{PREFIX} -EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")_")/execute" +EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")")" # Run the execution exec "${EXECUTABLE_BINARY}" "$@" \ No newline at end of file diff --git a/databuild/rules.bzl b/databuild/rules.bzl index 6ea211c..e0a173d 100644 --- a/databuild/rules.bzl +++ b/databuild/rules.bzl @@ -564,7 +564,7 @@ _databuild_graph_exec = rule( allow_empty = False, ), "_template": attr.label( - default = "@databuild//databuild/graph:go_exec_wrapper.sh.tpl", + default = "@databuild//databuild/graph:rust_execute_wrapper.sh.tpl", allow_single_file = True, ), "_bash_runfiles": attr.label(