databuild/graph/analyze.go
2025-04-18 19:38:05 -07:00

279 lines
8.2 KiB
Go

package main
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"log"
)
// DataDepType represents the type of data dependency
type DataDepType string
const (
Query DataDepType = "query"
Materialize DataDepType = "materialize"
)
// DataDep represents a data dependency
type DataDep struct {
DepType DataDepType `json:"depType"`
Ref string `json:"ref"`
}
// JobConfig represents the configuration for a job
type JobConfig struct {
Inputs []DataDep `json:"inputs"`
Outputs []string `json:"outputs"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}
// Task represents a job task
type Task struct {
JobLabel string `json:"jobLabel"`
Config JobConfig `json:"config"`
}
// JobGraph represents a graph of jobs
type JobGraph struct {
Outputs []string `json:"outputs"`
Nodes []Task `json:"nodes"`
}
// PartitionManifest represents a partition manifest
type PartitionManifest struct {
Outputs string `json:"outputs"`
Inputs []DataDep `json:"inputs"`
StartTime int `json:"startTime"`
EndTime int `json:"endTime"`
Config JobConfig `json:"config"`
}
// jobLabelToCfgPath converts a job label to a configuration path
func jobLabelToCfgPath(jobLabel string) string {
return "." + strings.Replace(strings.Replace(jobLabel, "//", "", -1), ":", "/", -1) + ".cfg"
}
// configure configures the specified job to produce the desired outputs
func configure(jobLabel string, outputRefs []string) ([]Task, error) {
candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS")
var jobPathMap map[string]string
if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil {
return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err)
}
// Look up the executable path for this job
execPath, ok := jobPathMap[jobLabel]
if !ok {
return nil, fmt.Errorf("job %s is not a candidate job", jobLabel)
}
// Check if executable exists
if _, err := os.Stat(execPath); err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("executable not found at path: %s", execPath)
}
return nil, fmt.Errorf("error checking executable: %v", err)
}
log.Printf("Executing job configuration: %s %v", execPath, outputRefs)
cmd := exec.Command(execPath, outputRefs...)
var stdout, stderr strings.Builder
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Printf("Job configuration failed: %s", stderr.String())
return nil, fmt.Errorf("Failed to run job config: %s", stderr.String())
}
log.Printf("Job configuration succeeded for %s", jobLabel)
// Parse the job configurations
var jobConfigs []JobConfig
err = json.Unmarshal([]byte(stdout.String()), &jobConfigs)
if err != nil {
log.Printf("Error parsing job configs for %s: %s. `%s`", jobLabel, err, stdout.String())
return nil, fmt.Errorf("Failed to parse job configs: %s", err)
}
// Create tasks
tasks := make([]Task, len(jobConfigs))
for i, cfg := range jobConfigs {
tasks[i] = Task{
JobLabel: jobLabel,
Config: cfg,
}
}
log.Printf("Created %d tasks for job %s", len(tasks), jobLabel)
return tasks, nil
}
// resolve produces a mapping of required job refs to the partitions it produces
func resolve(outputRefs []string) (map[string][]string, error) {
lookupPath := os.Getenv("DATABUILD_JOB_LOOKUP_PATH")
// Run the job lookup
log.Printf("Executing job lookup: %s %v", lookupPath, outputRefs)
cmd := exec.Command(lookupPath, outputRefs...)
var stdout, stderr strings.Builder
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Printf("Job lookup failed: %s", stderr.String())
return nil, fmt.Errorf("Failed to run job lookup: %s", stderr.String())
}
log.Printf("Job lookup succeeded for %d output refs", len(outputRefs))
// Parse the result
var result map[string][]string
err = json.Unmarshal([]byte(stdout.String()), &result)
if err != nil {
log.Printf("Error parsing job lookup result: %s", err)
return nil, fmt.Errorf("Failed to parse job lookup result: %s", err)
}
log.Printf("Job lookup found %d job mappings", len(result))
for job, refs := range result {
log.Printf(" Job %s produces %d refs", job, len(refs))
}
return result, nil
}
// plan creates a job graph for given output references
func plan(outputRefs []string) (*JobGraph, error) {
log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs)
unhandledRefs := make(map[string]bool)
for _, ref := range outputRefs {
unhandledRefs[ref] = true
}
epoch := 0
var nodes []Task
for len(unhandledRefs) > 0 {
if epoch >= 1000 {
log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch)
return nil, fmt.Errorf("Still planning after %d epochs, giving up", epoch)
}
log.Printf("Planning epoch %d with %d unhandled refs", epoch, len(unhandledRefs))
// Resolve jobs for all unhandled refs
unhandledRefsList := make([]string, 0, len(unhandledRefs))
for ref := range unhandledRefs {
unhandledRefsList = append(unhandledRefsList, ref)
}
jobRefs, err := resolve(unhandledRefsList)
if err != nil {
return nil, err
}
// Configure jobs
var newNodes []Task
for jobLabel, producedRefs := range jobRefs {
tasks, err := configure(jobLabel, producedRefs)
if err != nil {
return nil, err
}
newNodes = append(newNodes, tasks...)
// Remove handled refs
for _, ref := range producedRefs {
delete(unhandledRefs, ref)
}
}
if len(unhandledRefs) > 0 {
log.Printf("Error: Still have unhandled refs after configuration phase: %v", unhandledRefs)
return nil, fmt.Errorf("Should have no unhandled refs after configuration phase, but had: %v", unhandledRefs)
}
epoch++
// Add new nodes to the graph
nodes = append(nodes, newNodes...)
log.Printf("Planning epoch %d completed: added %d new nodes, total nodes: %d", epoch, len(newNodes), len(nodes))
// Plan next epoch
newUnhandledCount := 0
for _, task := range newNodes {
for _, input := range task.Config.Inputs {
if input.DepType == Materialize {
if !unhandledRefs[input.Ref] {
newUnhandledCount++
}
unhandledRefs[input.Ref] = true
}
}
}
if newUnhandledCount > 0 {
log.Printf("Added %d new unhandled refs for next planning epoch", newUnhandledCount)
}
}
if len(nodes) > 0 {
log.Printf("Planning complete: created graph with %d nodes for %d output refs", len(nodes), len(outputRefs))
return &JobGraph{
Outputs: outputRefs,
Nodes: nodes,
}, nil
} else {
log.Printf("Planning failed: no nodes created for output refs %v", outputRefs)
return nil, fmt.Errorf("Unknown failure in graph planning")
}
}
func main() {
mode := os.Getenv("DATABUILD_MODE")
log.Printf("Starting analyze.go in mode: %s", mode)
if mode == "plan" {
// Get output refs from command line arguments
outputRefs := os.Args[1:]
graph, err := plan(outputRefs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Output the job graph as JSON
jsonData, err := json.Marshal(graph)
if err != nil {
log.Printf("Error marshaling job graph: %s", err)
fmt.Fprintf(os.Stderr, "Error marshaling job graph: %s\n", err)
os.Exit(1)
}
log.Printf("Successfully generated job graph with %d nodes", len(graph.Nodes))
fmt.Println(string(jsonData))
} else if mode == "lookup" {
// Get output refs from command line arguments
outputRefs := os.Args[1:]
result, err := resolve(outputRefs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Output the result as JSON
jsonData, err := json.Marshal(result)
if err != nil {
log.Printf("Error marshaling lookup result: %s", err)
fmt.Fprintf(os.Stderr, "Error marshaling lookup result: %s\n", err)
os.Exit(1)
}
log.Printf("Successfully completed lookup for %d output refs with %d job mappings", len(outputRefs), len(result))
fmt.Println(string(jsonData))
} else if mode == "import_test" {
log.Printf("Running in import_test mode")
fmt.Println("ok :)")
log.Printf("Import test completed successfully")
} else {
log.Printf("Error: Unknown mode '%s'", mode)
fmt.Fprintf(os.Stderr, "Unknown MODE `%s`\n", mode)
os.Exit(1)
}
}