371 lines
10 KiB
Go
371 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"log"
|
|
"sync"
|
|
"runtime"
|
|
"strconv"
|
|
)
|
|
|
|
// DataDepType represents the type of data dependency
|
|
type DataDepType string
|
|
|
|
const (
|
|
Query DataDepType = "query"
|
|
Materialize DataDepType = "materialize"
|
|
)
|
|
|
|
// DataDep represents a data dependency
|
|
type DataDep struct {
|
|
DepType DataDepType `json:"depType"`
|
|
Ref string `json:"ref"`
|
|
}
|
|
|
|
// JobConfig represents the configuration for a job
|
|
type JobConfig struct {
|
|
Inputs []DataDep `json:"inputs"`
|
|
Outputs []string `json:"outputs"`
|
|
Args []string `json:"args"`
|
|
Env map[string]string `json:"env"`
|
|
}
|
|
|
|
// Task represents a job task
|
|
type Task struct {
|
|
JobLabel string `json:"jobLabel"`
|
|
Config JobConfig `json:"config"`
|
|
}
|
|
|
|
// JobGraph represents a graph of jobs
|
|
type JobGraph struct {
|
|
Outputs []string `json:"outputs"`
|
|
Nodes []Task `json:"nodes"`
|
|
}
|
|
|
|
// PartitionManifest represents a partition manifest
|
|
type PartitionManifest struct {
|
|
Outputs string `json:"outputs"`
|
|
Inputs []DataDep `json:"inputs"`
|
|
StartTime int `json:"startTime"`
|
|
EndTime int `json:"endTime"`
|
|
Config JobConfig `json:"config"`
|
|
}
|
|
|
|
// jobLabelToCfgPath converts a job label to a configuration path
|
|
func jobLabelToCfgPath(jobLabel string) string {
|
|
return "." + strings.Replace(strings.Replace(jobLabel, "//", "", -1), ":", "/", -1) + ".cfg"
|
|
}
|
|
|
|
// configure configures the specified job to produce the desired outputs
|
|
func configure(jobLabel string, outputRefs []string) ([]Task, error) {
|
|
candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS")
|
|
var jobPathMap map[string]string
|
|
if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil {
|
|
return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err)
|
|
}
|
|
|
|
// Look up the executable path for this job
|
|
execPath, ok := jobPathMap[jobLabel]
|
|
if !ok {
|
|
return nil, fmt.Errorf("job %s is not a candidate job", jobLabel)
|
|
}
|
|
|
|
// Check if executable exists
|
|
if _, err := os.Stat(execPath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil, fmt.Errorf("executable not found at path: %s", execPath)
|
|
}
|
|
return nil, fmt.Errorf("error checking executable: %v", err)
|
|
}
|
|
|
|
log.Printf("Executing job configuration: %s %v", execPath, outputRefs)
|
|
cmd := exec.Command(execPath, outputRefs...)
|
|
|
|
var stdout, stderr strings.Builder
|
|
cmd.Stdout = &stdout
|
|
cmd.Stderr = &stderr
|
|
err := cmd.Run()
|
|
if err != nil {
|
|
log.Printf("Job configuration failed: %s", stderr.String())
|
|
return nil, fmt.Errorf("Failed to run job config: %s", stderr.String())
|
|
}
|
|
log.Printf("Job configuration succeeded for %s", jobLabel)
|
|
|
|
// Parse the job configurations
|
|
var jobConfigs []JobConfig
|
|
err = json.Unmarshal([]byte(stdout.String()), &jobConfigs)
|
|
if err != nil {
|
|
log.Printf("Error parsing job configs for %s: %s. `%s`", jobLabel, err, stdout.String())
|
|
return nil, fmt.Errorf("Failed to parse job configs: %s", err)
|
|
}
|
|
|
|
// Create tasks
|
|
tasks := make([]Task, len(jobConfigs))
|
|
for i, cfg := range jobConfigs {
|
|
tasks[i] = Task{
|
|
JobLabel: jobLabel,
|
|
Config: cfg,
|
|
}
|
|
}
|
|
|
|
log.Printf("Created %d tasks for job %s", len(tasks), jobLabel)
|
|
return tasks, nil
|
|
}
|
|
|
|
// resolve produces a mapping of required job refs to the partitions it produces
|
|
func resolve(outputRefs []string) (map[string][]string, error) {
|
|
lookupPath := os.Getenv("DATABUILD_JOB_LOOKUP_PATH")
|
|
|
|
// Run the job lookup
|
|
log.Printf("Executing job lookup: %s %v", lookupPath, outputRefs)
|
|
cmd := exec.Command(lookupPath, outputRefs...)
|
|
var stdout, stderr strings.Builder
|
|
cmd.Stdout = &stdout
|
|
cmd.Stderr = &stderr
|
|
err := cmd.Run()
|
|
if err != nil {
|
|
log.Printf("Job lookup failed: %s", stderr.String())
|
|
return nil, fmt.Errorf("Failed to run job lookup: %s", stderr.String())
|
|
}
|
|
log.Printf("Job lookup succeeded for %d output refs", len(outputRefs))
|
|
|
|
// Parse the result
|
|
var result map[string][]string
|
|
err = json.Unmarshal([]byte(stdout.String()), &result)
|
|
if err != nil {
|
|
log.Printf("Error parsing job lookup result: %s", err)
|
|
return nil, fmt.Errorf("Failed to parse job lookup result: %s", err)
|
|
}
|
|
|
|
log.Printf("Job lookup found %d job mappings", len(result))
|
|
for job, refs := range result {
|
|
log.Printf(" Job %s produces %d refs", job, len(refs))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// configureParallel configures multiple jobs in parallel
|
|
func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, error) {
|
|
var wg sync.WaitGroup
|
|
tasksChan := make(chan []Task, len(jobRefs))
|
|
errorChan := make(chan error, len(jobRefs))
|
|
jobsChan := make(chan struct {
|
|
jobLabel string
|
|
producedRefs []string
|
|
}, len(jobRefs))
|
|
|
|
// Use a mutex to protect access to the error variable
|
|
var mu sync.Mutex
|
|
var firstErr error
|
|
|
|
// Fill the jobs channel
|
|
for jobLabel, producedRefs := range jobRefs {
|
|
jobsChan <- struct {
|
|
jobLabel string
|
|
producedRefs []string
|
|
}{jobLabel, producedRefs}
|
|
}
|
|
close(jobsChan)
|
|
|
|
// Start workers
|
|
for i := 0; i < numWorkers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for job := range jobsChan {
|
|
// Check if an error has already occurred
|
|
mu.Lock()
|
|
if firstErr != nil {
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
mu.Unlock()
|
|
|
|
tasks, err := configure(job.jobLabel, job.producedRefs)
|
|
if err != nil {
|
|
mu.Lock()
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
errorChan <- err
|
|
}
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
tasksChan <- tasks
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wait for all workers to finish
|
|
go func() {
|
|
wg.Wait()
|
|
close(tasksChan)
|
|
close(errorChan)
|
|
}()
|
|
|
|
// Collect results
|
|
var allTasks []Task
|
|
for tasks := range tasksChan {
|
|
allTasks = append(allTasks, tasks...)
|
|
}
|
|
|
|
// Check for errors
|
|
select {
|
|
case err := <-errorChan:
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
default:
|
|
}
|
|
|
|
return allTasks, nil
|
|
}
|
|
|
|
// plan creates a job graph for given output references
|
|
func plan(outputRefs []string) (*JobGraph, error) {
|
|
log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs)
|
|
unhandledRefs := make(map[string]bool)
|
|
for _, ref := range outputRefs {
|
|
unhandledRefs[ref] = true
|
|
}
|
|
epoch := 0
|
|
var nodes []Task
|
|
|
|
// Determine the number of workers based on available CPU cores or environment variable
|
|
numWorkers := runtime.NumCPU()
|
|
if workerEnv := os.Getenv("DATABUILD_PARALLEL_WORKERS"); workerEnv != "" {
|
|
if parsedWorkers, err := strconv.Atoi(workerEnv); err != nil {
|
|
log.Printf("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '%s', using default: %d", workerEnv, numWorkers)
|
|
} else if parsedWorkers < 1 {
|
|
numWorkers = 1
|
|
log.Printf("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: %d", numWorkers)
|
|
} else {
|
|
numWorkers = parsedWorkers
|
|
}
|
|
}
|
|
log.Printf("Using %d workers for parallel execution", numWorkers)
|
|
|
|
for len(unhandledRefs) > 0 {
|
|
if epoch >= 1000 {
|
|
log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch)
|
|
return nil, fmt.Errorf("Still planning after %d epochs, giving up", epoch)
|
|
}
|
|
|
|
log.Printf("Planning epoch %d with %d unhandled refs", epoch, len(unhandledRefs))
|
|
// Resolve jobs for all unhandled refs
|
|
unhandledRefsList := make([]string, 0, len(unhandledRefs))
|
|
for ref := range unhandledRefs {
|
|
unhandledRefsList = append(unhandledRefsList, ref)
|
|
}
|
|
jobRefs, err := resolve(unhandledRefsList)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Configure jobs in parallel
|
|
newNodes, err := configureParallel(jobRefs, numWorkers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove handled refs
|
|
for _, producedRefs := range jobRefs {
|
|
for _, ref := range producedRefs {
|
|
delete(unhandledRefs, ref)
|
|
}
|
|
}
|
|
|
|
if len(unhandledRefs) > 0 {
|
|
log.Printf("Error: Still have unhandled refs after configuration phase: %v", unhandledRefs)
|
|
return nil, fmt.Errorf("Should have no unhandled refs after configuration phase, but had: %v", unhandledRefs)
|
|
}
|
|
epoch++
|
|
|
|
// Add new nodes to the graph
|
|
nodes = append(nodes, newNodes...)
|
|
log.Printf("Planning epoch %d completed: added %d new nodes, total nodes: %d", epoch, len(newNodes), len(nodes))
|
|
|
|
// Plan next epoch
|
|
newUnhandledCount := 0
|
|
for _, task := range newNodes {
|
|
for _, input := range task.Config.Inputs {
|
|
if input.DepType == Materialize {
|
|
if !unhandledRefs[input.Ref] {
|
|
newUnhandledCount++
|
|
}
|
|
unhandledRefs[input.Ref] = true
|
|
}
|
|
}
|
|
}
|
|
if newUnhandledCount > 0 {
|
|
log.Printf("Added %d new unhandled refs for next planning epoch", newUnhandledCount)
|
|
}
|
|
}
|
|
|
|
if len(nodes) > 0 {
|
|
log.Printf("Planning complete: created graph with %d nodes for %d output refs", len(nodes), len(outputRefs))
|
|
return &JobGraph{
|
|
Outputs: outputRefs,
|
|
Nodes: nodes,
|
|
}, nil
|
|
} else {
|
|
log.Printf("Planning failed: no nodes created for output refs %v", outputRefs)
|
|
return nil, fmt.Errorf("Unknown failure in graph planning")
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
mode := os.Getenv("DATABUILD_MODE")
|
|
log.Printf("Starting analyze.go in mode: %s", mode)
|
|
|
|
if mode == "plan" {
|
|
// Get output refs from command line arguments
|
|
outputRefs := os.Args[1:]
|
|
graph, err := plan(outputRefs)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Output the job graph as JSON
|
|
jsonData, err := json.Marshal(graph)
|
|
if err != nil {
|
|
log.Printf("Error marshaling job graph: %s", err)
|
|
fmt.Fprintf(os.Stderr, "Error marshaling job graph: %s\n", err)
|
|
os.Exit(1)
|
|
}
|
|
log.Printf("Successfully generated job graph with %d nodes", len(graph.Nodes))
|
|
fmt.Println(string(jsonData))
|
|
} else if mode == "lookup" {
|
|
// Get output refs from command line arguments
|
|
outputRefs := os.Args[1:]
|
|
result, err := resolve(outputRefs)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Output the result as JSON
|
|
jsonData, err := json.Marshal(result)
|
|
if err != nil {
|
|
log.Printf("Error marshaling lookup result: %s", err)
|
|
fmt.Fprintf(os.Stderr, "Error marshaling lookup result: %s\n", err)
|
|
os.Exit(1)
|
|
}
|
|
log.Printf("Successfully completed lookup for %d output refs with %d job mappings", len(outputRefs), len(result))
|
|
fmt.Println(string(jsonData))
|
|
} else if mode == "import_test" {
|
|
log.Printf("Running in import_test mode")
|
|
fmt.Println("ok :)")
|
|
log.Printf("Import test completed successfully")
|
|
} else {
|
|
log.Printf("Error: Unknown mode '%s'", mode)
|
|
fmt.Fprintf(os.Stderr, "Unknown MODE `%s`\n", mode)
|
|
os.Exit(1)
|
|
}
|
|
}
|