WIP
This commit is contained in:
parent
13fc7a7c63
commit
f045ff31f6
8 changed files with 666 additions and 6 deletions
8
examples/basic_graph/test/exec_test.sh
Normal file
8
examples/basic_graph/test/exec_test.sh
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# Test the .exec rule
|
||||||
|
basic_graph.exec < <(basic_graph.analyze /tmp/databuild/examples/basic_graph/generated_number/pippin_salem_sadie)
|
||||||
|
|
||||||
|
# Test the .build rule
|
||||||
|
basic_graph.build /tmp/databuild/examples/basic_graph/generated_number/pippin_salem_sadie
|
||||||
|
|
@ -2,10 +2,17 @@ load("@rules_go//go:def.bzl", "go_binary")
|
||||||
|
|
||||||
exports_files([
|
exports_files([
|
||||||
"go_analyze_wrapper.sh.tpl",
|
"go_analyze_wrapper.sh.tpl",
|
||||||
|
"go_exec_wrapper.sh.tpl",
|
||||||
])
|
])
|
||||||
|
|
||||||
go_binary(
|
go_binary(
|
||||||
name = "go_analyze",
|
name = "analyze",
|
||||||
srcs = ["analyze.go"],
|
srcs = ["analyze.go"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_binary(
|
||||||
|
name = "execute",
|
||||||
|
srcs = ["execute.go"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
|
||||||
517
graph/execute.go
Normal file
517
graph/execute.go
Normal file
|
|
@ -0,0 +1,517 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DataDepType represents the type of data dependency
|
||||||
|
type DataDepType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Query DataDepType = "query"
|
||||||
|
Materialize DataDepType = "materialize"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DataDep represents a data dependency
|
||||||
|
type DataDep struct {
|
||||||
|
DepType DataDepType `json:"depType"`
|
||||||
|
Ref string `json:"ref"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobConfig represents the configuration for a job
|
||||||
|
type JobConfig struct {
|
||||||
|
Inputs []DataDep `json:"inputs"`
|
||||||
|
Outputs []string `json:"outputs"`
|
||||||
|
Args []string `json:"args"`
|
||||||
|
Env map[string]string `json:"env"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Task represents a job task
|
||||||
|
type Task struct {
|
||||||
|
JobLabel string `json:"jobLabel"`
|
||||||
|
Config JobConfig `json:"config"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobGraph represents a graph of jobs
|
||||||
|
type JobGraph struct {
|
||||||
|
Outputs []string `json:"outputs"`
|
||||||
|
Nodes []Task `json:"nodes"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobResult represents the result of a job execution
|
||||||
|
type JobResult struct {
|
||||||
|
Task Task
|
||||||
|
Success bool
|
||||||
|
Error error
|
||||||
|
StartTime time.Time
|
||||||
|
EndTime time.Time
|
||||||
|
StdOut string
|
||||||
|
StdErr string
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskState represents the state of a task
|
||||||
|
type TaskState string
|
||||||
|
|
||||||
|
const (
|
||||||
|
TaskNotReady TaskState = "not_ready"
|
||||||
|
TaskReady TaskState = "ready"
|
||||||
|
TaskScheduled TaskState = "scheduled"
|
||||||
|
TaskRunning TaskState = "running"
|
||||||
|
TaskFailed TaskState = "failed"
|
||||||
|
TaskSucceeded TaskState = "succeeded"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Executor manages the execution of jobs in the graph
|
||||||
|
type Executor struct {
|
||||||
|
graph *JobGraph
|
||||||
|
completedJobs map[string]bool
|
||||||
|
completedMutex sync.Mutex
|
||||||
|
jobResults []JobResult
|
||||||
|
resultsMutex sync.Mutex
|
||||||
|
runningJobs int
|
||||||
|
runningMutex sync.Mutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
failFast bool
|
||||||
|
failedJob bool
|
||||||
|
failedMutex sync.Mutex
|
||||||
|
taskStates map[string]TaskState
|
||||||
|
statesMutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewExecutor creates a new executor for the given job graph
|
||||||
|
func NewExecutor(graph *JobGraph, failFast bool) *Executor {
|
||||||
|
executor := &Executor{
|
||||||
|
graph: graph,
|
||||||
|
completedJobs: make(map[string]bool),
|
||||||
|
failFast: failFast,
|
||||||
|
taskStates: make(map[string]TaskState),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize all tasks as not ready
|
||||||
|
for _, task := range graph.Nodes {
|
||||||
|
executor.taskStates[task.JobLabel] = TaskNotReady
|
||||||
|
}
|
||||||
|
|
||||||
|
return executor
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute runs all jobs in the graph
|
||||||
|
func (e *Executor) Execute() ([]JobResult, error) {
|
||||||
|
// Start the periodic logging of running jobs
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
go e.logRunningJobsPeriodically(5*time.Second, stopChan)
|
||||||
|
|
||||||
|
// Create a channel for tasks to be executed
|
||||||
|
taskChan := make(chan Task, len(e.graph.Nodes))
|
||||||
|
|
||||||
|
// Create a channel for signaling when to check for new tasks
|
||||||
|
newTasksChan := make(chan struct{}, len(e.graph.Nodes))
|
||||||
|
|
||||||
|
// Start worker goroutines
|
||||||
|
numWorkers := 4 // Can be made configurable
|
||||||
|
for i := 0; i < numWorkers; i++ {
|
||||||
|
go e.worker(taskChan, newTasksChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedule initial tasks (those with no dependencies or with all dependencies satisfied)
|
||||||
|
e.scheduleReadyTasks(taskChan)
|
||||||
|
|
||||||
|
// Process new tasks signals until all tasks are complete
|
||||||
|
done := false
|
||||||
|
for !done {
|
||||||
|
select {
|
||||||
|
case <-newTasksChan:
|
||||||
|
// Schedule any new ready tasks
|
||||||
|
e.scheduleReadyTasks(taskChan)
|
||||||
|
default:
|
||||||
|
// Check if all tasks are done
|
||||||
|
allDone := true
|
||||||
|
|
||||||
|
// Check if there are any running tasks
|
||||||
|
e.runningMutex.Lock()
|
||||||
|
if e.runningJobs > 0 {
|
||||||
|
allDone = false
|
||||||
|
}
|
||||||
|
e.runningMutex.Unlock()
|
||||||
|
|
||||||
|
// Check if all tasks are in a terminal state (succeeded or failed)
|
||||||
|
if allDone {
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
for _, state := range e.taskStates {
|
||||||
|
if state != TaskSucceeded && state != TaskFailed {
|
||||||
|
allDone = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if allDone {
|
||||||
|
done = true
|
||||||
|
} else {
|
||||||
|
// Sleep a bit to avoid busy waiting
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(taskChan)
|
||||||
|
|
||||||
|
// Stop the periodic logging
|
||||||
|
stopChan <- struct{}{}
|
||||||
|
|
||||||
|
// Check if any job failed
|
||||||
|
e.failedMutex.Lock()
|
||||||
|
failed := e.failedJob
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
|
||||||
|
if failed && e.failFast {
|
||||||
|
return e.jobResults, fmt.Errorf("execution failed due to job failure and fail-fast is enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.jobResults, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// scheduleReadyTasks schedules all tasks that are ready to be executed
|
||||||
|
func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) {
|
||||||
|
// Check if any task has failed and fail-fast is enabled
|
||||||
|
e.failedMutex.Lock()
|
||||||
|
if e.failedJob && e.failFast {
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
|
||||||
|
for _, task := range e.graph.Nodes {
|
||||||
|
if e.isTaskReady(task) {
|
||||||
|
// Update task state to ready
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
currentState := e.taskStates[task.JobLabel]
|
||||||
|
if currentState == TaskNotReady {
|
||||||
|
e.taskStates[task.JobLabel] = TaskReady
|
||||||
|
}
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
|
||||||
|
// Update task state to scheduled and add to execution queue
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
if e.taskStates[task.JobLabel] == TaskReady {
|
||||||
|
e.taskStates[task.JobLabel] = TaskScheduled
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
|
||||||
|
e.wg.Add(1)
|
||||||
|
taskChan <- task
|
||||||
|
} else {
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTaskReady checks if a task is ready to be executed
|
||||||
|
func (e *Executor) isTaskReady(task Task) bool {
|
||||||
|
// Check if the task has already been completed
|
||||||
|
e.completedMutex.Lock()
|
||||||
|
if e.completedJobs[task.JobLabel] {
|
||||||
|
e.completedMutex.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
e.completedMutex.Unlock()
|
||||||
|
|
||||||
|
// Check if all dependencies are satisfied
|
||||||
|
for _, input := range task.Config.Inputs {
|
||||||
|
if input.DepType == Materialize {
|
||||||
|
e.completedMutex.Lock()
|
||||||
|
if !e.completedJobs[input.Ref] {
|
||||||
|
e.completedMutex.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
e.completedMutex.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker processes tasks from the task channel
|
||||||
|
func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) {
|
||||||
|
for task := range taskChan {
|
||||||
|
// Check if we should continue execution
|
||||||
|
if e.failFast {
|
||||||
|
e.failedMutex.Lock()
|
||||||
|
if e.failedJob {
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update task state to running
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskRunning
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
|
||||||
|
// Execute the task
|
||||||
|
e.executeTask(task)
|
||||||
|
|
||||||
|
// Signal that a task has completed and new tasks might be ready
|
||||||
|
newTasksChan <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// executeTask executes a single task
|
||||||
|
func (e *Executor) executeTask(task Task) {
|
||||||
|
// Increment running jobs counter
|
||||||
|
e.runningMutex.Lock()
|
||||||
|
e.runningJobs++
|
||||||
|
e.runningMutex.Unlock()
|
||||||
|
|
||||||
|
// Log start of job
|
||||||
|
log.Printf("Starting job: %s", task.JobLabel)
|
||||||
|
|
||||||
|
// Record start time
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
// Prepare for capturing stdout and stderr
|
||||||
|
var stdoutBuf, stderrBuf strings.Builder
|
||||||
|
|
||||||
|
// Execute the job
|
||||||
|
cmd := exec.Command(task.JobLabel + ".exec")
|
||||||
|
|
||||||
|
// Set environment variables (only system environment, not job-specific)
|
||||||
|
cmd.Env = os.Environ()
|
||||||
|
|
||||||
|
// Convert job config to JSON for stdin
|
||||||
|
configJSON, err := json.Marshal(task.Config)
|
||||||
|
if err != nil {
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up stdin pipe
|
||||||
|
stdin, err := cmd.StdinPipe()
|
||||||
|
if err != nil {
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capture stdout and stderr
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
stdin.Close()
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
|
if err != nil {
|
||||||
|
stdin.Close()
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the command
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
stdin.Close()
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write config JSON to stdin and close it
|
||||||
|
_, err = stdin.Write(configJSON)
|
||||||
|
stdin.Close()
|
||||||
|
if err != nil {
|
||||||
|
e.recordJobResult(task, false, err, startTime, time.Now(), "", "")
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
e.wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy stdout and stderr to buffers
|
||||||
|
go io.Copy(&stdoutBuf, stdout)
|
||||||
|
go io.Copy(&stderrBuf, stderr)
|
||||||
|
|
||||||
|
// Wait for the command to complete
|
||||||
|
err = cmd.Wait()
|
||||||
|
endTime := time.Now()
|
||||||
|
|
||||||
|
// Record the result
|
||||||
|
success := err == nil
|
||||||
|
e.recordJobResult(task, success, err, startTime, endTime, stdoutBuf.String(), stderrBuf.String())
|
||||||
|
|
||||||
|
// Log completion
|
||||||
|
if success {
|
||||||
|
log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime))
|
||||||
|
// Update task state to succeeded
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskSucceeded
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
} else {
|
||||||
|
log.Printf("Job failed: %s (duration: %v) Error: %v", task.JobLabel, endTime.Sub(startTime), err)
|
||||||
|
// Update task state to failed
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
e.taskStates[task.JobLabel] = TaskFailed
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decrement running jobs counter
|
||||||
|
e.runningMutex.Lock()
|
||||||
|
e.runningJobs--
|
||||||
|
e.runningMutex.Unlock()
|
||||||
|
|
||||||
|
// Mark the job as completed
|
||||||
|
e.completedMutex.Lock()
|
||||||
|
e.completedJobs[task.JobLabel] = true
|
||||||
|
for _, output := range task.Config.Outputs {
|
||||||
|
e.completedJobs[output] = true
|
||||||
|
}
|
||||||
|
e.completedMutex.Unlock()
|
||||||
|
|
||||||
|
// Mark as failed if needed
|
||||||
|
if !success {
|
||||||
|
e.failedMutex.Lock()
|
||||||
|
e.failedJob = true
|
||||||
|
e.failedMutex.Unlock()
|
||||||
|
|
||||||
|
// Log the failed task
|
||||||
|
log.Printf("Task failed: %s. Not scheduling any more tasks.", task.JobLabel)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.wg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordJobResult records the result of a job execution
|
||||||
|
func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) {
|
||||||
|
result := JobResult{
|
||||||
|
Task: task,
|
||||||
|
Success: success,
|
||||||
|
Error: err,
|
||||||
|
StartTime: startTime,
|
||||||
|
EndTime: endTime,
|
||||||
|
StdOut: stdout,
|
||||||
|
StdErr: stderr,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.resultsMutex.Lock()
|
||||||
|
e.jobResults = append(e.jobResults, result)
|
||||||
|
e.resultsMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// logRunningJobsPeriodically logs the tasks in each state periodically
|
||||||
|
func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan <-chan struct{}) {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
// Count tasks in each state
|
||||||
|
notReadyTasks := []string{}
|
||||||
|
readyTasks := []string{}
|
||||||
|
scheduledOrRunningTasks := []string{}
|
||||||
|
failedTasks := []string{}
|
||||||
|
succeededTasks := []string{}
|
||||||
|
|
||||||
|
e.statesMutex.Lock()
|
||||||
|
for taskLabel, state := range e.taskStates {
|
||||||
|
switch state {
|
||||||
|
case TaskNotReady:
|
||||||
|
notReadyTasks = append(notReadyTasks, taskLabel)
|
||||||
|
case TaskReady:
|
||||||
|
readyTasks = append(readyTasks, taskLabel)
|
||||||
|
case TaskScheduled, TaskRunning:
|
||||||
|
scheduledOrRunningTasks = append(scheduledOrRunningTasks, taskLabel)
|
||||||
|
case TaskFailed:
|
||||||
|
failedTasks = append(failedTasks, taskLabel)
|
||||||
|
case TaskSucceeded:
|
||||||
|
succeededTasks = append(succeededTasks, taskLabel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.statesMutex.Unlock()
|
||||||
|
|
||||||
|
// Log the counts and task labels for each state
|
||||||
|
log.Printf("Task Status Summary:")
|
||||||
|
log.Printf(" Not Ready (%d): %v", len(notReadyTasks), notReadyTasks)
|
||||||
|
log.Printf(" Ready (%d): %v", len(readyTasks), readyTasks)
|
||||||
|
log.Printf(" Scheduled/Running (%d): %v", len(scheduledOrRunningTasks), scheduledOrRunningTasks)
|
||||||
|
log.Printf(" Failed (%d): %v", len(failedTasks), failedTasks)
|
||||||
|
log.Printf(" Succeeded (%d): %v", len(succeededTasks), succeededTasks)
|
||||||
|
case <-stopChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Read the job graph from stdin
|
||||||
|
var graph JobGraph
|
||||||
|
decoder := json.NewDecoder(os.Stdin)
|
||||||
|
if err := decoder.Decode(&graph); err != nil {
|
||||||
|
log.Fatalf("Error decoding job graph: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Executing job graph with %d nodes", len(graph.Nodes))
|
||||||
|
|
||||||
|
// Create an executor with fail-fast enabled
|
||||||
|
executor := NewExecutor(&graph, true)
|
||||||
|
|
||||||
|
// Execute the graph
|
||||||
|
results, err := executor.Execute()
|
||||||
|
|
||||||
|
// Log the results
|
||||||
|
successCount := 0
|
||||||
|
failureCount := 0
|
||||||
|
for _, result := range results {
|
||||||
|
if result.Success {
|
||||||
|
successCount++
|
||||||
|
} else {
|
||||||
|
failureCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Execution complete: %d succeeded, %d failed", successCount, failureCount)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Execution failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if failureCount > 0 {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -5,7 +5,7 @@ set -e
|
||||||
|
|
||||||
%{PREFIX}
|
%{PREFIX}
|
||||||
|
|
||||||
EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/go_analyze"
|
EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/analyze"
|
||||||
|
|
||||||
# Run the configuration
|
# Run the configuration
|
||||||
exec "${EXECUTABLE_BINARY}" "$@"
|
exec "${EXECUTABLE_BINARY}" "$@"
|
||||||
|
|
|
||||||
11
graph/go_exec_wrapper.sh.tpl
Normal file
11
graph/go_exec_wrapper.sh.tpl
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
%{RUNFILES_PREFIX}
|
||||||
|
|
||||||
|
%{PREFIX}
|
||||||
|
|
||||||
|
EXECUTABLE_BINARY="$(rlocation "databuild+/graph/$(basename "%{EXECUTABLE_PATH}")_")/execute"
|
||||||
|
|
||||||
|
# Run the execution
|
||||||
|
exec "${EXECUTABLE_BINARY}" "$@"
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
sh_test(
|
sh_test(
|
||||||
name = "analyze_test",
|
name = "analyze_test",
|
||||||
srcs = ["analyze_test.sh"],
|
srcs = ["analyze_test.sh"],
|
||||||
data = ["//graph:go_analyze"],
|
data = ["//graph:analyze"],
|
||||||
)
|
)
|
||||||
|
|
@ -1,3 +1,3 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/go_analyze
|
DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/analyze_/analyze
|
||||||
|
|
|
||||||
121
rules.bzl
121
rules.bzl
|
|
@ -244,6 +244,17 @@ def databuild_graph(name, jobs, lookup, visibility = None):
|
||||||
jobs = jobs,
|
jobs = jobs,
|
||||||
visibility = visibility,
|
visibility = visibility,
|
||||||
)
|
)
|
||||||
|
_databuild_graph_exec(
|
||||||
|
name = "%s.exec" % name,
|
||||||
|
jobs = jobs,
|
||||||
|
visibility = visibility,
|
||||||
|
)
|
||||||
|
_databuild_graph_build(
|
||||||
|
name = "%s.build" % name,
|
||||||
|
analyze = "%s.analyze" % name,
|
||||||
|
exec = "%s.exec" % name,
|
||||||
|
visibility = visibility,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO there feels like a lot of boilerplate around wrapping a target with a script - can this be simplified?
|
# TODO there feels like a lot of boilerplate around wrapping a target with a script - can this be simplified?
|
||||||
|
|
@ -379,15 +390,121 @@ _databuild_graph_analyze = rule(
|
||||||
allow_files = True,
|
allow_files = True,
|
||||||
),
|
),
|
||||||
"_analyze": attr.label(
|
"_analyze": attr.label(
|
||||||
default = "@databuild//graph:go_analyze",
|
default = "@databuild//graph:analyze",
|
||||||
executable = True,
|
executable = True,
|
||||||
cfg = "target",
|
cfg = "target",
|
||||||
# cfg = "exec",
|
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
executable = True,
|
executable = True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _databuild_graph_exec_impl(ctx):
|
||||||
|
script = ctx.actions.declare_file(ctx.label.name)
|
||||||
|
|
||||||
|
# Gather the execute executables
|
||||||
|
execute_executables = [
|
||||||
|
job[DataBuildJobInfo].execute
|
||||||
|
for job in ctx.attr.jobs
|
||||||
|
]
|
||||||
|
|
||||||
|
ctx.actions.expand_template(
|
||||||
|
template = ctx.file._template,
|
||||||
|
output = script,
|
||||||
|
substitutions = {
|
||||||
|
"%{EXECUTABLE_PATH}": ctx.attr._execute.files_to_run.executable.path,
|
||||||
|
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
|
||||||
|
"%{PREFIX}": "",
|
||||||
|
},
|
||||||
|
is_executable = True,
|
||||||
|
)
|
||||||
|
|
||||||
|
runfiles = ctx.runfiles(
|
||||||
|
files = [ctx.executable._execute] + execute_executables,
|
||||||
|
).merge(ctx.attr._execute.default_runfiles).merge(
|
||||||
|
ctx.attr._bash_runfiles.default_runfiles
|
||||||
|
).merge_all([job.default_runfiles for job in ctx.attr.jobs])
|
||||||
|
|
||||||
|
# Merge runfiles from all execute targets
|
||||||
|
for job in ctx.attr.jobs:
|
||||||
|
execute_target = job[DataBuildJobInfo].execute
|
||||||
|
if hasattr(execute_target, "default_runfiles"):
|
||||||
|
runfiles = runfiles.merge(execute_target.default_runfiles)
|
||||||
|
|
||||||
|
return [
|
||||||
|
DefaultInfo(
|
||||||
|
executable = script,
|
||||||
|
runfiles = runfiles,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
_databuild_graph_exec = rule(
|
||||||
|
implementation = _databuild_graph_exec_impl,
|
||||||
|
attrs = {
|
||||||
|
"jobs": attr.label_list(
|
||||||
|
doc = "The list of jobs that are candidates for building partitions in this databuild graph",
|
||||||
|
allow_empty = False,
|
||||||
|
),
|
||||||
|
"_template": attr.label(
|
||||||
|
default = "@databuild//graph:go_exec_wrapper.sh.tpl",
|
||||||
|
allow_single_file = True,
|
||||||
|
),
|
||||||
|
"_bash_runfiles": attr.label(
|
||||||
|
default = Label("@bazel_tools//tools/bash/runfiles"),
|
||||||
|
allow_files = True,
|
||||||
|
),
|
||||||
|
"_execute": attr.label(
|
||||||
|
default = "@databuild//graph:execute",
|
||||||
|
executable = True,
|
||||||
|
cfg = "target",
|
||||||
|
)
|
||||||
|
},
|
||||||
|
executable = True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _databuild_graph_build_impl(ctx):
|
||||||
|
"""Wraps the analyze and execute targets in a shell script."""
|
||||||
|
script = ctx.actions.declare_file(ctx.label.name)
|
||||||
|
ctx.actions.write(
|
||||||
|
output = script,
|
||||||
|
is_executable = True,
|
||||||
|
content = RUNFILES_PREFIX + """
|
||||||
|
$(rlocation _main/{analyze_path}) $@ | $(rlocation _main/{exec_path})
|
||||||
|
""".format(
|
||||||
|
analyze_path = ctx.attr.analyze.files_to_run.executable.short_path,
|
||||||
|
exec_path = ctx.attr.exec.files_to_run.executable.short_path,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
runfiles = ctx.runfiles(
|
||||||
|
files = [ctx.executable.analyze, ctx.executable.exec],
|
||||||
|
).merge(ctx.attr.analyze.default_runfiles).merge(ctx.attr.exec.default_runfiles)
|
||||||
|
|
||||||
|
return [
|
||||||
|
DefaultInfo(
|
||||||
|
executable = script,
|
||||||
|
runfiles = runfiles,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
_databuild_graph_build = rule(
|
||||||
|
implementation = _databuild_graph_build_impl,
|
||||||
|
attrs = {
|
||||||
|
"analyze": attr.label(
|
||||||
|
doc = "Target that implements the graph analysis logic",
|
||||||
|
mandatory = True,
|
||||||
|
executable = True,
|
||||||
|
cfg = "exec",
|
||||||
|
),
|
||||||
|
"exec": attr.label(
|
||||||
|
doc = "Target that implements the graph execution logic",
|
||||||
|
mandatory = True,
|
||||||
|
executable = True,
|
||||||
|
cfg = "target",
|
||||||
|
),
|
||||||
|
},
|
||||||
|
executable = True,
|
||||||
|
)
|
||||||
|
|
||||||
#def _graph_impl(name):
|
#def _graph_impl(name):
|
||||||
# """
|
# """
|
||||||
#
|
#
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue