diff --git a/examples/basic_graph/GenerateExecute.java b/examples/basic_graph/GenerateExecute.java index 37d2b0a..0e4064b 100644 --- a/examples/basic_graph/GenerateExecute.java +++ b/examples/basic_graph/GenerateExecute.java @@ -40,14 +40,19 @@ public class GenerateExecute { int randomNumber = random.nextInt(100); // Write the random number to the output file + // Ensure dir exists + java.io.File parent = new java.io.File(partitionRef).getParentFile(); + if (parent != null) { + parent.mkdirs(); + } try (FileWriter writer = new FileWriter(partitionRef)) { writer.write("Random number for partition " + partitionRef + ": " + randomNumber); } - + System.out.println("Generated random number " + randomNumber + " for partition " + partitionRef); // Write the random number to the output file - String outputPath = BASE_PATH + "generated_number/" + partitionRef; + String outputPath = partitionRef; System.out.println("Writing random number " + randomNumber + " to " + outputPath); // Ensure dir exists new java.io.File(outputPath).getParentFile().mkdirs(); @@ -55,7 +60,7 @@ public class GenerateExecute { try (FileWriter writer = new FileWriter(outputPath)) { writer.write(String.valueOf(randomNumber)); } - + } catch (NoSuchAlgorithmException | IOException e) { System.err.println("Error: " + e.getMessage()); e.printStackTrace(); diff --git a/examples/basic_graph/SumConfigure.java b/examples/basic_graph/SumConfigure.java index ad96dc5..197940a 100644 --- a/examples/basic_graph/SumConfigure.java +++ b/examples/basic_graph/SumConfigure.java @@ -29,7 +29,7 @@ public class SumConfigure { // Create and populate JobConfig object JobConfig config = new JobConfig(); - config.outputs = Collections.singletonList(partitionRef); + config.outputs = Collections.singletonList(BASE_PATH + "sum/" +partitionRef); config.inputs = Arrays.stream(upstreams) .map(upstream -> { DataDep dep = new DataDep(); @@ -38,7 +38,7 @@ public class SumConfigure { return dep; }) .collect(Collectors.toList()); - config.args = Collections.singletonList(partitionRef); + config.args = Arrays.asList(upstreams); // Create a hashmap for env with {"OUTPUT_REF": "foo"} config.env = Collections.singletonMap("OUTPUT_REF", args[0]); // inputs and env are already initialized as empty collections in the constructor diff --git a/examples/basic_graph/SumExecute.java b/examples/basic_graph/SumExecute.java index 3a18cf8..f45acae 100644 --- a/examples/basic_graph/SumExecute.java +++ b/examples/basic_graph/SumExecute.java @@ -16,7 +16,7 @@ public class SumExecute { int sum = 0; for (String partitionRef : args) { try { - String path = BASE_PATH + partitionRef; + String path = partitionRef; int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path)))); System.out.println("Summing partition " + partitionRef + " with value " + partitionValue); sum += partitionValue; @@ -27,8 +27,14 @@ public class SumExecute { } System.out.println("Sum of " + args.length + " partitions: " + sum); // Write the sum to the output file - String outPath = BASE_PATH + "sum/" + outputRef; + String outPath = outputRef; System.out.println("Writing sum " + sum + " to " + outPath); + + java.io.File parent = new java.io.File(outPath).getParentFile(); + if (parent != null) { + parent.mkdirs(); + } + try (java.io.FileWriter writer = new java.io.FileWriter(outPath)) { writer.write(String.valueOf(sum)); } catch (Exception e) { diff --git a/examples/basic_graph/test/generate_number_test.sh b/examples/basic_graph/test/generate_number_test.sh index fe1b35b..6b8c00d 100755 --- a/examples/basic_graph/test/generate_number_test.sh +++ b/examples/basic_graph/test/generate_number_test.sh @@ -2,14 +2,14 @@ set -e # Test configure -generate_number_job.cfg pippin salem sadie +generate_number_job.cfg /tmp/databuild/examples/basic_graph/generated_number/pippin /tmp/databuild/examples/basic_graph/generated_number/salem /tmp/databuild/examples/basic_graph/generated_number/sadie # Test run -generate_number_job.cfg pippin | jq -c ".[0]" | generate_number_job.exec +generate_number_job.cfg /tmp/databuild/examples/basic_graph/generated_number/pippin | jq -c ".[0]" | generate_number_job.exec -# Validate that contents of pippin is 43 -if [[ "$(cat /tmp/databuild/examples/basic_graph/generated_number/pippin)" != "43" ]]; then - echo "Assertion failed: File does not contain 43" +# Validate that contents of pippin is 57 +if [[ "$(cat /tmp/databuild/examples/basic_graph/generated_number/pippin)" != "57" ]]; then + echo "Assertion failed: File does not contain 57" cat /tmp/databuild/examples/basic_graph/generated_number/pippin exit 1 fi diff --git a/examples/basic_graph/test/sum_test.sh b/examples/basic_graph/test/sum_test.sh index afbc81c..23628c2 100755 --- a/examples/basic_graph/test/sum_test.sh +++ b/examples/basic_graph/test/sum_test.sh @@ -5,15 +5,15 @@ set -e sum_job.cfg pippin_salem_sadie # Test run -echo -n 43 > /tmp/databuild/examples/basic_graph/generated_number/pippin -echo -n 56 > /tmp/databuild/examples/basic_graph/generated_number/salem -echo -n 40 > /tmp/databuild/examples/basic_graph/generated_number/sadie +echo -n 57 > /tmp/databuild/examples/basic_graph/generated_number/pippin +echo -n 59 > /tmp/databuild/examples/basic_graph/generated_number/salem +echo -n 1 > /tmp/databuild/examples/basic_graph/generated_number/sadie sum_job.cfg pippin_salem_sadie | jq -c ".[0]" | sum_job.exec # Validate that contents of pippin is 43 -if [[ "$(cat /tmp/databuild/examples/basic_graph/pippin_salem_sadie)" != "139" ]]; then - echo "Assertion failed: File does not contain 139" +if [[ "$(cat /tmp/databuild/examples/basic_graph/sum/pippin_salem_sadie)" != "117" ]]; then + echo "Assertion failed: File does not contain 117" cat /tmp/databuild/examples/basic_graph/sum/pippin_salem_sadie exit 1 fi diff --git a/graph/execute.go b/graph/execute.go index 674c132..640f95d 100644 --- a/graph/execute.go +++ b/graph/execute.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/exec" + "path/filepath" "strings" "sync" "time" @@ -40,6 +41,24 @@ type Task struct { Config JobConfig `json:"config"` } +// getTaskKey generates a unique key for a task based on its JobLabel and input/output references +func getTaskKey(task Task) string { + // Start with the job label + key := task.JobLabel + + // Add input references to the key + for _, input := range task.Config.Inputs { + key += "|input:" + input.Ref + } + + // Add output references to the key + for _, output := range task.Config.Outputs { + key += "|output:" + output + } + + return key +} + // JobGraph represents a graph of jobs type JobGraph struct { Outputs []string `json:"outputs"` @@ -97,7 +116,7 @@ func NewExecutor(graph *JobGraph, failFast bool) *Executor { // Initialize all tasks as not ready for _, task := range graph.Nodes { - executor.taskStates[task.JobLabel] = TaskNotReady + executor.taskStates[getTaskKey(task)] = TaskNotReady } return executor @@ -193,17 +212,18 @@ func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) { for _, task := range e.graph.Nodes { if e.isTaskReady(task) { // Update task state to ready + taskKey := getTaskKey(task) e.statesMutex.Lock() - currentState := e.taskStates[task.JobLabel] + currentState := e.taskStates[taskKey] if currentState == TaskNotReady { - e.taskStates[task.JobLabel] = TaskReady + e.taskStates[taskKey] = TaskReady } e.statesMutex.Unlock() // Update task state to scheduled and add to execution queue e.statesMutex.Lock() - if e.taskStates[task.JobLabel] == TaskReady { - e.taskStates[task.JobLabel] = TaskScheduled + if e.taskStates[taskKey] == TaskReady { + e.taskStates[taskKey] = TaskScheduled e.statesMutex.Unlock() e.wg.Add(1) @@ -218,8 +238,9 @@ func (e *Executor) scheduleReadyTasks(taskChan chan<- Task) { // isTaskReady checks if a task is ready to be executed func (e *Executor) isTaskReady(task Task) bool { // Check if the task has already been completed + taskKey := getTaskKey(task) e.completedMutex.Lock() - if e.completedJobs[task.JobLabel] { + if e.completedJobs[taskKey] { e.completedMutex.Unlock() return false } @@ -256,7 +277,7 @@ func (e *Executor) worker(taskChan chan Task, newTasksChan chan struct{}) { // Update task state to running e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskRunning + e.taskStates[getTaskKey(task)] = TaskRunning e.statesMutex.Unlock() // Execute the task @@ -284,7 +305,9 @@ func (e *Executor) executeTask(task Task) { var stdoutBuf, stderrBuf strings.Builder // Execute the job - cmd := exec.Command(task.JobLabel + ".exec") + // Resolve the executable from runfiles + execPath := resolveExecutableFromRunfiles(task.JobLabel) + cmd := exec.Command(execPath) // Set environment variables (only system environment, not job-specific) cmd.Env = os.Environ() @@ -295,7 +318,7 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return @@ -307,7 +330,7 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return @@ -320,7 +343,7 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return @@ -332,7 +355,7 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() return @@ -344,9 +367,11 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() + // Log the error and return + log.Printf("Error starting command: %v", err) return } @@ -357,9 +382,11 @@ func (e *Executor) executeTask(task Task) { e.recordJobResult(task, false, err, startTime, time.Now(), "", "") // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() e.wg.Done() + // Log the error and return + log.Printf("Error writing config JSON to stdin: %v", err) return } @@ -380,13 +407,13 @@ func (e *Executor) executeTask(task Task) { log.Printf("Job succeeded: %s (duration: %v)", task.JobLabel, endTime.Sub(startTime)) // Update task state to succeeded e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskSucceeded + e.taskStates[getTaskKey(task)] = TaskSucceeded e.statesMutex.Unlock() } else { - log.Printf("Job failed: %s (duration: %v) Error: %v", task.JobLabel, endTime.Sub(startTime), err) + log.Printf("Job failed: %s (duration: %v) Error: %v\nStderr: %s", task.JobLabel, endTime.Sub(startTime), err, stderrBuf.String()) // Update task state to failed e.statesMutex.Lock() - e.taskStates[task.JobLabel] = TaskFailed + e.taskStates[getTaskKey(task)] = TaskFailed e.statesMutex.Unlock() } @@ -397,7 +424,7 @@ func (e *Executor) executeTask(task Task) { // Mark the job as completed e.completedMutex.Lock() - e.completedJobs[task.JobLabel] = true + e.completedJobs[getTaskKey(task)] = true for _, output := range task.Config.Outputs { e.completedJobs[output] = true } @@ -410,12 +437,48 @@ func (e *Executor) executeTask(task Task) { e.failedMutex.Unlock() // Log the failed task - log.Printf("Task failed: %s. Not scheduling any more tasks.", task.JobLabel) + log.Printf("Task failed: %s. Not scheduling any more tasks. Stderr: %s", task.JobLabel, stderrBuf.String()) } e.wg.Done() } +// resolveExecutableFromRunfiles resolves the executable path from runfiles +func resolveExecutableFromRunfiles(jobLabel string) string { + // Get the runfiles directory from the environment + runfilesDir := os.Getenv("RUNFILES_DIR") + if runfilesDir == "" { + // If RUNFILES_DIR is not set, try to find it from the executable path + execPath, err := os.Executable() + if err == nil { + runfilesDir = execPath + ".runfiles" + } + } + + // If we still don't have a runfiles directory, fall back to the old behavior + if runfilesDir == "" { + log.Printf("Warning: RUNFILES_DIR not found, falling back to direct executable path") + return jobLabel + ".exec" + } + + // Construct the path to the executable in the runfiles directory + // The executable should be in the _main directory with the job label name + ".exec" + // Extract the target name from the job label (which might be in the format "//package:target") + targetName := jobLabel + if colonIndex := strings.LastIndex(jobLabel, ":"); colonIndex != -1 { + targetName = jobLabel[colonIndex+1:] + } else { + // If there's no colon, it might be a path like "//package/target" + targetName = filepath.Base(jobLabel) + } + + execName := targetName + ".exec" + execPath := filepath.Join(runfilesDir, "_main", execName) + + log.Printf("Resolved executable path: %s", execPath) + return execPath +} + // recordJobResult records the result of a job execution func (e *Executor) recordJobResult(task Task, success bool, err error, startTime, endTime time.Time, stdout, stderr string) { result := JobResult{ @@ -449,18 +512,24 @@ func (e *Executor) logRunningJobsPeriodically(interval time.Duration, stopChan < succeededTasks := []string{} e.statesMutex.Lock() - for taskLabel, state := range e.taskStates { + for taskKey, state := range e.taskStates { + // Extract the job label from the task key for display + jobLabel := taskKey + if idx := strings.Index(taskKey, "|"); idx > 0 { + jobLabel = taskKey[:idx] + } + switch state { case TaskNotReady: - notReadyTasks = append(notReadyTasks, taskLabel) + notReadyTasks = append(notReadyTasks, jobLabel) case TaskReady: - readyTasks = append(readyTasks, taskLabel) + readyTasks = append(readyTasks, jobLabel) case TaskScheduled, TaskRunning: - scheduledOrRunningTasks = append(scheduledOrRunningTasks, taskLabel) + scheduledOrRunningTasks = append(scheduledOrRunningTasks, jobLabel) case TaskFailed: - failedTasks = append(failedTasks, taskLabel) + failedTasks = append(failedTasks, jobLabel) case TaskSucceeded: - succeededTasks = append(succeededTasks, taskLabel) + succeededTasks = append(succeededTasks, jobLabel) } } e.statesMutex.Unlock()