This commit is contained in:
parent
91d5fd26bc
commit
2b2c136c67
4 changed files with 4 additions and 597 deletions
|
|
@ -2,19 +2,12 @@ load("@rules_go//go:def.bzl", "go_binary")
|
|||
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library")
|
||||
|
||||
exports_files([
|
||||
"go_analyze_wrapper.sh.tpl",
|
||||
"go_exec_wrapper.sh.tpl",
|
||||
"rust_analyze_wrapper.sh.tpl",
|
||||
"rust_execute_wrapper.sh.tpl",
|
||||
])
|
||||
|
||||
go_binary(
|
||||
name = "execute",
|
||||
srcs = ["execute.go"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
rust_binary(
|
||||
name = "execute_rs",
|
||||
name = "execute",
|
||||
srcs = ["execute.rs"],
|
||||
edition = "2021",
|
||||
deps = [
|
||||
|
|
|
|||
|
|
@ -1,586 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DataDepType represents the type of data dependency
|
||||
type DataDepType string
|
||||
|
||||
const (
|
||||
Query DataDepType = "query"
|
||||
Materialize DataDepType = "materialize"
|
||||
)
|
||||
|
||||
// DataDep represents a data dependency
|
||||
type DataDep struct {
|
||||
DepType DataDepType `json:"depType"`
|
||||
Ref string `json:"ref"`
|
||||
}
|
||||
|
||||
// JobConfig represents the configuration for a job
|
||||
type JobConfig struct {
|
||||
Inputs []DataDep `json:"inputs"`
|
||||
Outputs []string `json:"outputs"`
|
||||
Args []string `json:"args"`
|
||||
Env map[string]string `json:"env"`
|
||||
}
|
||||
|
||||
// Task represents a job task
|
||||
type Task struct {
|
||||
JobLabel string `json:"jobLabel"`
|
||||
Config JobConfig `json:"config"`
|
||||
}
|
||||
|
||||
// getTaskKey generates a unique key for a task based on its JobLabel and input/output references
|
||||
func getTaskKey(task Task) string {
|
||||
// Start with the job label
|
||||
key := task.JobLabel
|
||||
|
||||
// Add input references to the key
|
||||
for _, input := range task.Config.Inputs {
|
||||
key += "|input:" + input.Ref
|
||||
}
|
||||
|
||||
// Add output references to the key
|
||||
for _, output := range task.Config.Outputs {
|
||||
key += "|output:" + output
|
||||
}
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
// JobGraph represents a graph of jobs
|
||||
type JobGraph struct {
|
||||
Outputs []string `json:"outputs"`
|
||||
Nodes []Task `json:"nodes"`
|
||||
}
|
||||
|
||||
// JobResult represents the result of a job execution
|
||||
type JobResult struct {
|
||||
Task Task
|
||||
Success bool
|
||||
Error error
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
StdOut string
|
||||
StdErr string
|
||||
}
|
||||
|
||||
// TaskState represents the state of a task
|
||||
type TaskState string
|
||||
|
||||
const (
|
||||
TaskNotReady TaskState = "not_ready"
|
||||
TaskReady TaskState = "ready"
|
||||
TaskScheduled TaskState = "scheduled"
|
||||
TaskRunning TaskState = "running"
|
||||
TaskFailed TaskState = "failed"
|
||||
TaskSucceeded TaskState = "succeeded"
|
||||
)
|
||||
|
||||
// Executor manages the execution of jobs in the graph
|
||||
type Executor struct {
|
||||
graph *JobGraph
|
||||
completedJobs map[string]bool
|
||||
completedMutex sync.Mutex
|
||||
jobResults []JobResult
|
||||
resultsMutex sync.Mutex
|
||||
runningJobs int
|
||||
runningMutex sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
failFast bool
|
||||
failedJob bool
|
||||
failedMutex sync.Mutex
|
||||
taskStates map[string]TaskState
|
||||
statesMutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewExecutor creates a new executor for the given job graph
|
||||
func NewExecutor(graph *JobGraph, failFast bool) *Executor {
|
||||
executor := &Executor{
|
||||
graph: graph,
|
||||
completedJobs: make(map[string]bool),
|
||||
failFast: failFast,
|
||||
taskStates: make(map[string]TaskState),
|
||||
}
|
||||
|
||||
// Initialize all tasks as not ready
|
||||
for _, task := range graph.Nodes {
|
||||
executor.taskStates[getTaskKey(task)] = TaskNotReady
|
||||
}
|
||||
|
||||
return executor
|
||||
}
|
||||
|
||||
// Execute runs all jobs in the graph
|
||||
func (e *Executor) Execute() ([]JobResult, error) {
|
||||
// Start the periodic logging of running jobs
|
||||
stopChan := make(chan struct{})
|
||||
go e.logRunningJobsPeriodically(5*time.Second, stopChan)
|
||||
|
||||
// Create a channel for tasks to be executed
|
||||
taskChan := make(chan Task, len(e.graph.Nodes))
|
||||
|
||||
// Create a channel for signaling when to check for new tasks
|
||||
newTasksChan := make(chan struct{}, len(e.graph.Nodes))
|
||||
|
||||
// Start worker goroutines
|
||||
numWorkers := 4 // Can be made configurable
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go e.worker(taskChan, newTasksChan)
|
||||
}
|
||||
|
||||
// Schedule initial tasks (those with no dependencies or with all dependencies satisfied)
|
||||
e.scheduleReadyTasks(taskChan)
|
||||
|
||||
// Process new tasks signals until all tasks are complete
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-newTasksChan:
|
||||
// Schedule any new ready tasks
|
||||
e.scheduleReadyTasks(taskChan)
|
||||
default:
|
||||
// Check if all tasks are done
|
||||
allDone := true
|
||||
|
||||
// Check if there are any running tasks
|
||||
e.runningMutex.Lock()
|
||||
if e.runningJobs > 0 {
|
||||
allDone = false
|
||||
}
|
||||
e.runningMutex.Unlock()
|
||||
|
||||
// Check if all tasks are in a terminal state (succeeded or failed)
|
||||
if allDone {
|
||||
e.statesMutex.Lock()
|
||||
for _, state := range e.taskStates {
|
||||
if state != TaskSucceeded && state != TaskFailed {
|
||||
allDone = false
|
||||
break
|
||||
}
|
||||
}
|
||||
e.statesMutex.Unlock()
|
||||
}
|
||||
|
||||
if allDone {
|
||||
done = true
|
||||
} else {
|
||||
// Sleep a bit to avoid busy waiting
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(taskChan)
|
||||
|
||||
// Stop the periodic logging
|
||||
stopChan <- struct{}{}
|
||||
|
||||
// Check if any job failed
|
||||
e.failedMutex.Lock()
|
||||
failed := e.failedJob
|
||||
e.failedMutex.Unlock()
|
||||
|
||||
if failed && e.failFast {
|
||||
return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled")
|
||||
}
|
||||
|
||||
return e.jobResults, nil
|
||||
}
|
||||
|
||||
// scheduleReadyTasks schedules all tasks that are ready to be executed
|
||||
func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) {
|
||||
// Check if any task has failed and fail-fast is enabled
|
||||
e.failedMutex.Lock()
|
||||
if e.failedJob && e.failFast {
|
||||
e.failedMutex.Unlock()
|
||||
return
|
||||
}
|
||||
e.failedMutex.Unlock()
|
||||
|
||||
for _, task := range e.graph.Nodes {
|
||||
if e.isTaskReady(task) {
|
||||
// Update task state to ready
|
||||
taskKey := getTaskKey(task)
|
||||
e.statesMutex.Lock()
|
||||
currentState := e.taskStates[taskKey]
|
||||
if currentState == TaskNotReady {
|
||||
e.taskStates[taskKey] = TaskReady
|
||||
}
|
||||
e.statesMutex.Unlock()
|
||||
|
||||
// Update task state to scheduled and add to execution queue
|
||||
e.statesMutex.Lock()
|
||||
if e.taskStates[taskKey] == TaskReady {
|
||||
e.taskStates[taskKey] = TaskScheduled
|
||||
e.statesMutex.Unlock()
|
||||
|
||||
e.wg.Add(1)
|
||||
taskChan <- task
|
||||
} else {
|
||||
e.statesMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isTaskReady checks if a task is ready to be executed
|
||||
func (e *Executor) isTaskReady(task Task) bool {
|
||||
// Check if the task has already been completed
|
||||
taskKey := getTaskKey(task)
|
||||
e.completedMutex.Lock()
|
||||
if e.completedJobs[taskKey] {
|
||||
e.completedMutex.Unlock()
|
||||
return false
|
||||
}
|
||||
e.completedMutex.Unlock()
|
||||
|
||||
// Check if all dependencies are satisfied
|
||||
for _, input := range task.Config.Inputs {
|
||||
if input.DepType == Materialize {
|
||||
e.completedMutex.Lock()
|
||||
if !e.completedJobs[input.Ref] {
|
||||
e.completedMutex.Unlock()
|
||||
return false
|
||||
}
|
||||
e.completedMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// worker processes tasks from the task channel
|
||||
func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) {
|
||||
for task := range taskChan {
|
||||
// Check if we should continue execution
|
||||
if e.failFast {
|
||||
e.failedMutex.Lock()
|
||||
if e.failedJob {
|
||||
e.failedMutex.Unlock()
|
||||
e.wg.Done()
|
||||
continue
|
||||
}
|
||||
e.failedMutex.Unlock()
|
||||
}
|
||||
|
||||
// Update task state to running
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskRunning
|
||||
e.statesMutex.Unlock()
|
||||
|
||||
// Execute the task
|
||||
e.executeTask(task)
|
||||
|
||||
// Signal that a task has completed and new tasks might be ready
|
||||
newTasksChan <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// executeTask executes a single task
|
||||
func (e *Executor) executeTask(task Task) {
|
||||
// Increment running jobs counter
|
||||
e.runningMutex.Lock()
|
||||
e.runningJobs++
|
||||
e.runningMutex.Unlock()
|
||||
|
||||
// Log start of job
|
||||
log.Printf("Starting job: %s", task.JobLabel)
|
||||
|
||||
// Record start time
|
||||
startTime := time.Now()
|
||||
|
||||
// Prepare for capturing stdout and stderr
|
||||
var stdoutBuf, stderrBuf strings.Builder
|
||||
|
||||
// Execute the job
|
||||
// Resolve the executable from runfiles
|
||||
execPath := resolveExecutableFromRunfiles(task.JobLabel)
|
||||
cmd := exec.Command(execPath)
|
||||
|
||||
// Set environment variables (only system environment, not job-specific)
|
||||
cmd.Env = os.Environ()
|
||||
|
||||
// Convert job config to JSON for stdin
|
||||
configJSON, err := json.Marshal(task.Config)
|
||||
if err != nil {
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
// Set up stdin pipe
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
// Capture stdout and stderr
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
stdin.Close()
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
stdin.Close()
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
// Start the command
|
||||
if err := cmd.Start(); err != nil {
|
||||
stdin.Close()
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
// Log the error and return
|
||||
log.Printf("Error starting command: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Write config JSON to stdin and close it
|
||||
_, err = stdin.Write(configJSON)
|
||||
stdin.Close()
|
||||
if err != nil {
|
||||
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
e.wg.Done()
|
||||
// Log the error and return
|
||||
log.Printf("Error writing config JSON to stdin: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Copy stdout and stderr to buffers
|
||||
go io.Copy(&stdoutBuf, stdout)
|
||||
go io.Copy(&stderrBuf, stderr)
|
||||
|
||||
// Wait for the command to complete
|
||||
err = cmd.Wait()
|
||||
endTime := time.Now()
|
||||
|
||||
// Record the result
|
||||
success := err == nil
|
||||
e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String())
|
||||
|
||||
// Log completion
|
||||
if success {
|
||||
log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime))
|
||||
// Update task state to succeeded
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskSucceeded
|
||||
e.statesMutex.Unlock()
|
||||
} else {
|
||||
log.Printf("Job failed: %s (duration: %v) Error: %v\nStderr: %s", task.JobLabel, endTime.Sub(startTime), err, stderrBuf.String())
|
||||
// Update task state to failed
|
||||
e.statesMutex.Lock()
|
||||
e.taskStates[getTaskKey(task)] = TaskFailed
|
||||
e.statesMutex.Unlock()
|
||||
}
|
||||
|
||||
// Decrement running jobs counter
|
||||
e.runningMutex.Lock()
|
||||
e.runningJobs--
|
||||
e.runningMutex.Unlock()
|
||||
|
||||
// Mark the job as completed
|
||||
e.completedMutex.Lock()
|
||||
e.completedJobs[getTaskKey(task)] = true
|
||||
for _, output := range task.Config.Outputs {
|
||||
e.completedJobs[output] = true
|
||||
}
|
||||
e.completedMutex.Unlock()
|
||||
|
||||
// Mark as failed if needed
|
||||
if !success {
|
||||
e.failedMutex.Lock()
|
||||
e.failedJob = true
|
||||
e.failedMutex.Unlock()
|
||||
|
||||
// Log the failed task
|
||||
log.Printf("Task failed: %s. Not scheduling any more tasks. Stderr: %s", task.JobLabel, stderrBuf.String())
|
||||
}
|
||||
|
||||
e.wg.Done()
|
||||
}
|
||||
|
||||
// resolveExecutableFromRunfiles resolves the executable path from runfiles
|
||||
func resolveExecutableFromRunfiles(jobLabel string) string {
|
||||
// Get the runfiles directory from the environment
|
||||
runfilesDir := os.Getenv("RUNFILES_DIR")
|
||||
if runfilesDir == "" {
|
||||
// If RUNFILES_DIR is not set, try to find it from the executable path
|
||||
execPath, err := os.Executable()
|
||||
if err == nil {
|
||||
runfilesDir = execPath + ".runfiles"
|
||||
}
|
||||
}
|
||||
|
||||
// If we still don't have a runfiles directory, fall back to the old behavior
|
||||
if runfilesDir == "" {
|
||||
log.Printf("Warning: RUNFILES_DIR not found, falling back to direct executable path")
|
||||
return jobLabel + ".exec"
|
||||
}
|
||||
|
||||
// Construct the path to the executable in the runfiles directory
|
||||
// The executable should be in the _main directory with the job label name + ".exec"
|
||||
// Extract the target name from the job label (which might be in the format "//package:target")
|
||||
targetName := jobLabel
|
||||
if colonIndex := strings.LastIndex(jobLabel, ":"); colonIndex != -1 {
|
||||
targetName = jobLabel[colonIndex+1:]
|
||||
} else {
|
||||
// If there's no colon, it might be a path like "//package/target"
|
||||
targetName = filepath.Base(jobLabel)
|
||||
}
|
||||
|
||||
execName := targetName + ".exec"
|
||||
execPath := filepath.Join(runfilesDir, "_main", execName)
|
||||
|
||||
log.Printf("Resolved executable path: %s", execPath)
|
||||
return execPath
|
||||
}
|
||||
|
||||
// recordJobResult records the result of a job execution
|
||||
func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) {
|
||||
result := JobResult{
|
||||
Task: task,
|
||||
Success: success,
|
||||
Error: err,
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
StdOut: stdout,
|
||||
StdErr: stderr,
|
||||
}
|
||||
|
||||
e.resultsMutex.Lock()
|
||||
e.jobResults = append(e.jobResults, result)
|
||||
e.resultsMutex.Unlock()
|
||||
}
|
||||
|
||||
// logRunningJobsPeriodically logs the tasks in each state periodically
|
||||
func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Count tasks in each state
|
||||
notReadyTasks := []string{}
|
||||
readyTasks := []string{}
|
||||
scheduledOrRunningTasks := []string{}
|
||||
failedTasks := []string{}
|
||||
succeededTasks := []string{}
|
||||
|
||||
e.statesMutex.Lock()
|
||||
for taskKey, state := range e.taskStates {
|
||||
// Extract the job label from the task key for display
|
||||
jobLabel := taskKey
|
||||
if idx := strings.Index(taskKey, "|"); idx > 0 {
|
||||
jobLabel = taskKey[:idx]
|
||||
}
|
||||
|
||||
switch state {
|
||||
case TaskNotReady:
|
||||
notReadyTasks = append(notReadyTasks, jobLabel)
|
||||
case TaskReady:
|
||||
readyTasks = append(readyTasks, jobLabel)
|
||||
case TaskScheduled, TaskRunning:
|
||||
scheduledOrRunningTasks = append(scheduledOrRunningTasks, jobLabel)
|
||||
case TaskFailed:
|
||||
failedTasks = append(failedTasks, jobLabel)
|
||||
case TaskSucceeded:
|
||||
succeededTasks = append(succeededTasks, jobLabel)
|
||||
}
|
||||
}
|
||||
e.statesMutex.Unlock()
|
||||
|
||||
// Log the counts and task labels for each state
|
||||
log.Printf("Task Status Summary:")
|
||||
log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks)
|
||||
log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks)
|
||||
log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks)
|
||||
log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks)
|
||||
log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks)
|
||||
case <-stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Read the job graph from stdin
|
||||
var graph JobGraph
|
||||
decoder := json.NewDecoder(os.Stdin)
|
||||
if err := decoder.Decode(&graph); err != nil {
|
||||
log.Fatalf("Error decoding job graph: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Executing job graph with %d nodes", len(graph.Nodes))
|
||||
|
||||
// Create an executor with fail-fast enabled
|
||||
executor := NewExecutor(&graph, true)
|
||||
|
||||
// Execute the graph
|
||||
results, err := executor.Execute()
|
||||
|
||||
// Log the results
|
||||
successCount := 0
|
||||
failureCount := 0
|
||||
for _, result := range results {
|
||||
if result.Success {
|
||||
successCount++
|
||||
} else {
|
||||
failureCount++
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Execution failed: %v", err)
|
||||
}
|
||||
|
||||
if failureCount > 0 {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ set -e
|
|||
|
||||
%{PREFIX}
|
||||
|
||||
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")_")/execute"
|
||||
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")")"
|
||||
|
||||
# Run the execution
|
||||
exec "${EXECUTABLE_BINARY}" "$@"
|
||||
|
|
@ -564,7 +564,7 @@ _databuild_graph_exec = rule(
|
|||
allow_empty = False,
|
||||
),
|
||||
"_template": attr.label(
|
||||
default = "@databuild//databuild/graph:go_exec_wrapper.sh.tpl",
|
||||
default = "@databuild//databuild/graph:rust_execute_wrapper.sh.tpl",
|
||||
allow_single_file = True,
|
||||
),
|
||||
"_bash_runfiles": attr.label(
|
||||
|
|
|
|||
Loading…
Reference in a new issue