diff --git a/examples/basic_graph/test/exec_test.sh b/examples/basic_graph/test/exec_test.sh new file mode 100644 index 0000000..3d6b4ba --- /dev/null +++ b/examples/basic_graph/test/exec_test.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -e + +# Test the .exec rule +basic_graph.exec < <(basic_graph.analyze /tmp/databuild/examples/basic_graph/generated_number/pippin_salem_sadie) + +# Test the .build rule +basic_graph.build /tmp/databuild/examples/basic_graph/generated_number/pippin_salem_sadie \ No newline at end of file diff --git a/graph/BUILD.bazel b/graph/BUILD.bazel index f47b129..29f5a53 100644 --- a/graph/BUILD.bazel +++ b/graph/BUILD.bazel @@ -2,10 +2,17 @@ load("@rules_go//go:def.bzl", "go_binary") exports_files([ "go_analyze_wrapper.sh.tpl", + "go_exec_wrapper.sh.tpl", ]) go_binary( - name = "go_analyze", + name = "analyze", srcs = ["analyze.go"], visibility = ["//visibility:public"], ) + +go_binary( + name = "execute", + srcs = ["execute.go"], + visibility = ["//visibility:public"], +) diff --git a/graph/execute.go b/graph/execute.go new file mode 100644 index 0000000..674c132 --- /dev/null +++ b/graph/execute.go @@ -0,0 +1,517 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "os" + "os/exec" + "strings" + "sync" + "time" +) + +// DataDepType represents the type of data dependency +type DataDepType string + +const ( + Query DataDepType = "query" + Materialize DataDepType = "materialize" +) + +// DataDep represents a data dependency +type DataDep struct { + DepType DataDepType `json:"depType"` + Ref string `json:"ref"` +} + +// JobConfig represents the configuration for a job +type JobConfig struct { + Inputs []DataDep `json:"inputs"` + Outputs []string `json:"outputs"` + Args []string `json:"args"` + Env map[string]string `json:"env"` +} + +// Task represents a job task +type Task struct { + JobLabel string `json:"jobLabel"` + Config JobConfig `json:"config"` +} + +// JobGraph represents a graph of jobs +type JobGraph struct { + Outputs []string `json:"outputs"` + Nodes []Task `json:"nodes"` +} + +// JobResult represents the result of a job execution +type JobResult struct { + Task Task + Success bool + Error error + StartTime time.Time + EndTime time.Time + StdOut string + StdErr string +} + +// TaskState represents the state of a task +type TaskState string + +const ( + TaskNotReady TaskState = "not_ready" + TaskReady TaskState = "ready" + TaskScheduled TaskState = "scheduled" + TaskRunning TaskState = "running" + TaskFailed TaskState = "failed" + TaskSucceeded TaskState = "succeeded" +) + +// Executor manages the execution of jobs in the graph +type Executor struct { + graph *JobGraph + completedJobs map[string]bool + completedMutex sync.Mutex + jobResults []JobResult + resultsMutex sync.Mutex + runningJobs int + runningMutex sync.Mutex + wg sync.WaitGroup + failFast bool + failedJob bool + failedMutex sync.Mutex + taskStates map[string]TaskState + statesMutex sync.Mutex +} + +// NewExecutor creates a new executor for the given job graph +func NewExecutor(graph *JobGraph, failFast bool) *Executor { + executor := &Executor{ + graph: graph, + completedJobs: make(map[string]bool), + failFast: failFast, + taskStates: make(map[string]TaskState), + } + + // Initialize all tasks as not ready + for _, task := range graph.Nodes { + executor.taskStates[task.JobLabel] = TaskNotReady + } + + return executor +} + +// Execute runs all jobs in the graph +func (e *Executor) Execute() ([]JobResult, error) { + // Start the periodic logging of running jobs + stopChan := make(chan struct{}) + go e.logRunningJobsPeriodically(5*time.Second, stopChan) + + // Create a channel for tasks to be executed + taskChan := make(chan Task, len(e.graph.Nodes)) + + // Create a channel for signaling when to check for new tasks + newTasksChan := make(chan struct{}, len(e.graph.Nodes)) + + // Start worker goroutines + numWorkers := 4 // Can be made configurable + for i := 0; i < numWorkers; i++ { + go e.worker(taskChan, newTasksChan) + } + + // Schedule initial tasks (those with no dependencies or with all dependencies satisfied) + e.scheduleReadyTasks(taskChan) + + // Process new tasks signals until all tasks are complete + done := false + for !done { + select { + case <-newTasksChan: + // Schedule any new ready tasks + e.scheduleReadyTasks(taskChan) + default: + // Check if all tasks are done + allDone := true + + // Check if there are any running tasks + e.runningMutex.Lock() + if e.runningJobs > 0 { + allDone = false + } + e.runningMutex.Unlock() + + // Check if all tasks are in a terminal state (succeeded or failed) + if allDone { + e.statesMutex.Lock() + for _, state := range e.taskStates { + if state != TaskSucceeded && state != TaskFailed { + allDone = false + break + } + } + e.statesMutex.Unlock() + } + + if allDone { + done = true + } else { + // Sleep a bit to avoid busy waiting + time.Sleep(100 * time.Millisecond) + } + } + } + + close(taskChan) + + // Stop the periodic logging + stopChan <- struct{}{} + + // Check if any job failed + e.failedMutex.Lock() + failed := e.failedJob + e.failedMutex.Unlock() + + if failed && e.failFast { + return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled") + } + + return e.jobResults, nil +} + +// scheduleReadyTasks schedules all tasks that are ready to be executed +func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) { + // Check if any task has failed and fail-fast is enabled + e.failedMutex.Lock() + if e.failedJob && e.failFast { + e.failedMutex.Unlock() + return + } + e.failedMutex.Unlock() + + for _, task := range e.graph.Nodes { + if e.isTaskReady(task) { + // Update task state to ready + e.statesMutex.Lock() + currentState := e.taskStates[task.JobLabel] + if currentState == TaskNotReady { + e.taskStates[task.JobLabel] = TaskReady + } + e.statesMutex.Unlock() + + // Update task state to scheduled and add to execution queue + e.statesMutex.Lock() + if e.taskStates[task.JobLabel] == TaskReady { + e.taskStates[task.JobLabel] = TaskScheduled + e.statesMutex.Unlock() + + e.wg.Add(1) + taskChan <- task + } else { + e.statesMutex.Unlock() + } + } + } +} + +// isTaskReady checks if a task is ready to be executed +func (e *Executor) isTaskReady(task Task) bool { + // Check if the task has already been completed + e.completedMutex.Lock() + if e.completedJobs[task.JobLabel] { + e.completedMutex.Unlock() + return false + } + e.completedMutex.Unlock() + + // Check if all dependencies are satisfied + for _, input := range task.Config.Inputs { + if input.DepType == Materialize { + e.completedMutex.Lock() + if !e.completedJobs[input.Ref] { + e.completedMutex.Unlock() + return false + } + e.completedMutex.Unlock() + } + } + + return true +} + +// worker processes tasks from the task channel +func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) { + for task := range taskChan { + // Check if we should continue execution + if e.failFast { + e.failedMutex.Lock() + if e.failedJob { + e.failedMutex.Unlock() + e.wg.Done() + continue + } + e.failedMutex.Unlock() + } + + // Update task state to running + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskRunning + e.statesMutex.Unlock() + + // Execute the task + e.executeTask(task) + + // Signal that a task has completed and new tasks might be ready + newTasksChan <- struct{}{} + } +} + +// executeTask executes a single task +func (e *Executor) executeTask(task Task) { + // Increment running jobs counter + e.runningMutex.Lock() + e.runningJobs++ + e.runningMutex.Unlock() + + // Log start of job + log.Printf("Starting job: %s", task.JobLabel) + + // Record start time + startTime := time.Now() + + // Prepare for capturing stdout and stderr + var stdoutBuf, stderrBuf strings.Builder + + // Execute the job + cmd := exec.Command(task.JobLabel + ".exec") + + // Set environment variables (only system environment, not job-specific) + cmd.Env = os.Environ() + + // Convert job config to JSON for stdin + configJSON, err := json.Marshal(task.Config) + if err != nil { + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + // Set up stdin pipe + stdin, err := cmd.StdinPipe() + if err != nil { + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + // Capture stdout and stderr + stdout, err := cmd.StdoutPipe() + if err != nil { + stdin.Close() + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + stderr, err := cmd.StderrPipe() + if err != nil { + stdin.Close() + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + // Start the command + if err := cmd.Start(); err != nil { + stdin.Close() + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + // Write config JSON to stdin and close it + _, err = stdin.Write(configJSON) + stdin.Close() + if err != nil { + e.recordJobResult(task, false, err, startTime, time.Now(), "", "") + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + e.wg.Done() + return + } + + // Copy stdout and stderr to buffers + go io.Copy(&stdoutBuf, stdout) + go io.Copy(&stderrBuf, stderr) + + // Wait for the command to complete + err = cmd.Wait() + endTime := time.Now() + + // Record the result + success := err == nil + e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String()) + + // Log completion + if success { + log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime)) + // Update task state to succeeded + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskSucceeded + e.statesMutex.Unlock() + } else { + log.Printf("Job failed: %s (duration: %v) Error: %v", task.JobLabel, endTime.Sub(startTime), err) + // Update task state to failed + e.statesMutex.Lock() + e.taskStates[task.JobLabel] = TaskFailed + e.statesMutex.Unlock() + } + + // Decrement running jobs counter + e.runningMutex.Lock() + e.runningJobs-- + e.runningMutex.Unlock() + + // Mark the job as completed + e.completedMutex.Lock() + e.completedJobs[task.JobLabel] = true + for _, output := range task.Config.Outputs { + e.completedJobs[output] = true + } + e.completedMutex.Unlock() + + // Mark as failed if needed + if !success { + e.failedMutex.Lock() + e.failedJob = true + e.failedMutex.Unlock() + + // Log the failed task + log.Printf("Task failed: %s. Not scheduling any more tasks.", task.JobLabel) + } + + e.wg.Done() +} + +// recordJobResult records the result of a job execution +func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) { + result := JobResult{ + Task: task, + Success: success, + Error: err, + StartTime: startTime, + EndTime: endTime, + StdOut: stdout, + StdErr: stderr, + } + + e.resultsMutex.Lock() + e.jobResults = append(e.jobResults, result) + e.resultsMutex.Unlock() +} + +// logRunningJobsPeriodically logs the tasks in each state periodically +func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Count tasks in each state + notReadyTasks := []string{} + readyTasks := []string{} + scheduledOrRunningTasks := []string{} + failedTasks := []string{} + succeededTasks := []string{} + + e.statesMutex.Lock() + for taskLabel, state := range e.taskStates { + switch state { + case TaskNotReady: + notReadyTasks = append(notReadyTasks, taskLabel) + case TaskReady: + readyTasks = append(readyTasks, taskLabel) + case TaskScheduled, TaskRunning: + scheduledOrRunningTasks = append(scheduledOrRunningTasks, taskLabel) + case TaskFailed: + failedTasks = append(failedTasks, taskLabel) + case TaskSucceeded: + succeededTasks = append(succeededTasks, taskLabel) + } + } + e.statesMutex.Unlock() + + // Log the counts and task labels for each state + log.Printf("Task Status Summary:") + log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks) + log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks) + log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks) + log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks) + log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks) + case <-stopChan: + return + } + } +} + +func main() { + // Read the job graph from stdin + var graph JobGraph + decoder := json.NewDecoder(os.Stdin) + if err := decoder.Decode(&graph); err != nil { + log.Fatalf("Error decoding job graph: %v", err) + } + + log.Printf("Executing job graph with %d nodes", len(graph.Nodes)) + + // Create an executor with fail-fast enabled + executor := NewExecutor(&graph, true) + + // Execute the graph + results, err := executor.Execute() + + // Log the results + successCount := 0 + failureCount := 0 + for _, result := range results { + if result.Success { + successCount++ + } else { + failureCount++ + } + } + + log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount) + + if err != nil { + log.Fatalf("Execution failed: %v", err) + } + + if failureCount > 0 { + os.Exit(1) + } +} diff --git a/graph/go_analyze_wrapper.sh.tpl b/graph/go_analyze_wrapper.sh.tpl index bc92fca..2c4f057 100755 --- a/graph/go_analyze_wrapper.sh.tpl +++ b/graph/go_analyze_wrapper.sh.tpl @@ -5,7 +5,7 @@ set -e %{PREFIX} -EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/go_analyze" +EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/analyze" # Run the configuration exec "${EXECUTABLE_BINARY}" "$@" diff --git a/graph/go_exec_wrapper.sh.tpl b/graph/go_exec_wrapper.sh.tpl new file mode 100644 index 0000000..7a7e3ec --- /dev/null +++ b/graph/go_exec_wrapper.sh.tpl @@ -0,0 +1,11 @@ +#!/bin/bash +set -e + +%{RUNFILES_PREFIX} + +%{PREFIX} + +EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/execute" + +# Run the execution +exec "${EXECUTABLE_BINARY}" "$@" \ No newline at end of file diff --git a/graph/test/BUILD.bazel b/graph/test/BUILD.bazel index ca4c9ef..ccae68b 100644 --- a/graph/test/BUILD.bazel +++ b/graph/test/BUILD.bazel @@ -1,5 +1,5 @@ sh_test( name = "analyze_test", srcs = ["analyze_test.sh"], - data = ["//graph:go_analyze"], + data = ["//graph:analyze"], ) \ No newline at end of file diff --git a/graph/test/analyze_test.sh b/graph/test/analyze_test.sh index dc25712..f38db9f 100755 --- a/graph/test/analyze_test.sh +++ b/graph/test/analyze_test.sh @@ -1,3 +1,3 @@ #!/usr/bin/env bash -DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/go_analyze +DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/analyze_/analyze diff --git a/rules.bzl b/rules.bzl index 8f67c23..eaba414 100644 --- a/rules.bzl +++ b/rules.bzl @@ -244,6 +244,17 @@ def databuild_graph(name, jobs, lookup, visibility = None): jobs = jobs, visibility = visibility, ) + _databuild_graph_exec( + name = "%s.exec" % name, + jobs = jobs, + visibility = visibility, + ) + _databuild_graph_build( + name = "%s.build" % name, + analyze = "%s.analyze" % name, + exec = "%s.exec" % name, + visibility = visibility, + ) # TODO there feels like a lot of boilerplate around wrapping a target with a script - can this be simplified? @@ -379,15 +390,121 @@ _databuild_graph_analyze = rule( allow_files = True, ), "_analyze": attr.label( - default = "@databuild//graph:go_analyze", + default = "@databuild//graph:analyze", executable = True, cfg = "target", -# cfg = "exec", ) }, executable = True, ) +def _databuild_graph_exec_impl(ctx): + script = ctx.actions.declare_file(ctx.label.name) + + # Gather the execute executables + execute_executables = [ + job[DataBuildJobInfo].execute + for job in ctx.attr.jobs + ] + + ctx.actions.expand_template( + template = ctx.file._template, + output = script, + substitutions = { + "%{EXECUTABLE_PATH}": ctx.attr._execute.files_to_run.executable.path, + "%{RUNFILES_PREFIX}": RUNFILES_PREFIX, + "%{PREFIX}": "", + }, + is_executable = True, + ) + + runfiles = ctx.runfiles( + files = [ctx.executable._execute] + execute_executables, + ).merge(ctx.attr._execute.default_runfiles).merge( + ctx.attr._bash_runfiles.default_runfiles + ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) + + # Merge runfiles from all execute targets + for job in ctx.attr.jobs: + execute_target = job[DataBuildJobInfo].execute + if hasattr(execute_target, "default_runfiles"): + runfiles = runfiles.merge(execute_target.default_runfiles) + + return [ + DefaultInfo( + executable = script, + runfiles = runfiles, + ), + ] + +_databuild_graph_exec = rule( + implementation = _databuild_graph_exec_impl, + attrs = { + "jobs": attr.label_list( + doc = "The list of jobs that are candidates for building partitions in this databuild graph", + allow_empty = False, + ), + "_template": attr.label( + default = "@databuild//graph:go_exec_wrapper.sh.tpl", + allow_single_file = True, + ), + "_bash_runfiles": attr.label( + default = Label("@bazel_tools//tools/bash/runfiles"), + allow_files = True, + ), + "_execute": attr.label( + default = "@databuild//graph:execute", + executable = True, + cfg = "target", + ) + }, + executable = True, +) + +def _databuild_graph_build_impl(ctx): + """Wraps the analyze and execute targets in a shell script.""" + script = ctx.actions.declare_file(ctx.label.name) + ctx.actions.write( + output = script, + is_executable = True, + content = RUNFILES_PREFIX + """ +$(rlocation _main/{analyze_path}) $@ | $(rlocation _main/{exec_path}) + """.format( + analyze_path = ctx.attr.analyze.files_to_run.executable.short_path, + exec_path = ctx.attr.exec.files_to_run.executable.short_path, + ), + ) + + runfiles = ctx.runfiles( + files = [ctx.executable.analyze, ctx.executable.exec], + ).merge(ctx.attr.analyze.default_runfiles).merge(ctx.attr.exec.default_runfiles) + + return [ + DefaultInfo( + executable = script, + runfiles = runfiles, + ), + ] + +_databuild_graph_build = rule( + implementation = _databuild_graph_build_impl, + attrs = { + "analyze": attr.label( + doc = "Target that implements the graph analysis logic", + mandatory = True, + executable = True, + cfg = "exec", + ), + "exec": attr.label( + doc = "Target that implements the graph execution logic", + mandatory = True, + executable = True, + cfg = "target", + ), + }, + executable = True, +) + #def _graph_impl(name): # """ #