package main import ( "encoding/json" "fmt" "os" "os/exec" "strings" "log" "sync" "runtime" "strconv" ) // DataDepType represents the type of data dependency type DataDepType string const ( Query DataDepType = "query" Materialize DataDepType = "materialize" ) // DataDep represents a data dependency type DataDep struct { DepType DataDepType `json:"depType"` Ref string `json:"ref"` } // JobConfig represents the configuration for a job type JobConfig struct { Inputs []DataDep `json:"inputs"` Outputs []string `json:"outputs"` Args []string `json:"args"` Env map[string]string `json:"env"` } // Task represents a job task type Task struct { JobLabel string `json:"jobLabel"` Config JobConfig `json:"config"` } // JobGraph represents a graph of jobs type JobGraph struct { Outputs []string `json:"outputs"` Nodes []Task `json:"nodes"` } // PartitionManifest represents a partition manifest type PartitionManifest struct { Outputs string `json:"outputs"` Inputs []DataDep `json:"inputs"` StartTime int `json:"startTime"` EndTime int `json:"endTime"` Config JobConfig `json:"config"` } // jobLabelToCfgPath converts a job label to a configuration path func jobLabelToCfgPath(jobLabel string) string { return "." + strings.Replace(strings.Replace(jobLabel, "//", "", -1), ":", "/", -1) + ".cfg" } // configure configures the specified job to produce the desired outputs func configure(jobLabel string, outputRefs []string) ([]Task, error) { candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS") var jobPathMap map[string]string if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil { return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err) } // Look up the executable path for this job execPath, ok := jobPathMap[jobLabel] if !ok { return nil, fmt.Errorf("job %s is not a candidate job", jobLabel) } // Check if executable exists if _, err := os.Stat(execPath); err != nil { if os.IsNotExist(err) { return nil, fmt.Errorf("executable not found at path: %s", execPath) } return nil, fmt.Errorf("error checking executable: %v", err) } log.Printf("Executing job configuration: %s %v", execPath, outputRefs) cmd := exec.Command(execPath, outputRefs...) var stdout, stderr strings.Builder cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if err != nil { log.Printf("Job configuration failed: %s", stderr.String()) return nil, fmt.Errorf("Failed to run job config: %s", stderr.String()) } log.Printf("Job configuration succeeded for %s", jobLabel) // Parse the job configurations var jobConfigs []JobConfig err = json.Unmarshal([]byte(stdout.String()), &jobConfigs) if err != nil { log.Printf("Error parsing job configs for %s: %s. `%s`", jobLabel, err, stdout.String()) return nil, fmt.Errorf("Failed to parse job configs: %s", err) } // Create tasks tasks := make([]Task, len(jobConfigs)) for i, cfg := range jobConfigs { tasks[i] = Task{ JobLabel: jobLabel, Config: cfg, } } log.Printf("Created %d tasks for job %s", len(tasks), jobLabel) return tasks, nil } // resolve produces a mapping of required job refs to the partitions it produces func resolve(outputRefs []string) (map[string][]string, error) { lookupPath := os.Getenv("DATABUILD_JOB_LOOKUP_PATH") // Run the job lookup log.Printf("Executing job lookup: %s %v", lookupPath, outputRefs) cmd := exec.Command(lookupPath, outputRefs...) var stdout, stderr strings.Builder cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if err != nil { log.Printf("Job lookup failed: %s", stderr.String()) return nil, fmt.Errorf("Failed to run job lookup: %s", stderr.String()) } log.Printf("Job lookup succeeded for %d output refs", len(outputRefs)) // Parse the result var result map[string][]string err = json.Unmarshal([]byte(stdout.String()), &result) if err != nil { log.Printf("Error parsing job lookup result: %s", err) return nil, fmt.Errorf("Failed to parse job lookup result: %s", err) } log.Printf("Job lookup found %d job mappings", len(result)) for job, refs := range result { log.Printf(" Job %s produces %d refs", job, len(refs)) } return result, nil } // configureParallel configures multiple jobs in parallel func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, error) { var wg sync.WaitGroup tasksChan := make(chan []Task, len(jobRefs)) errorChan := make(chan error, len(jobRefs)) jobsChan := make(chan struct { jobLabel string producedRefs []string }, len(jobRefs)) // Use a mutex to protect access to the error variable var mu sync.Mutex var firstErr error // Fill the jobs channel for jobLabel, producedRefs := range jobRefs { jobsChan <- struct { jobLabel string producedRefs []string }{jobLabel, producedRefs} } close(jobsChan) // Start workers for i := 0; i < numWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for job := range jobsChan { // Check if an error has already occurred mu.Lock() if firstErr != nil { mu.Unlock() return } mu.Unlock() tasks, err := configure(job.jobLabel, job.producedRefs) if err != nil { mu.Lock() if firstErr == nil { firstErr = err errorChan <- err } mu.Unlock() return } tasksChan <- tasks } }() } // Wait for all workers to finish go func() { wg.Wait() close(tasksChan) close(errorChan) }() // Collect results var allTasks []Task for tasks := range tasksChan { allTasks = append(allTasks, tasks...) } // Check for errors select { case err := <-errorChan: if err != nil { return nil, err } default: } return allTasks, nil } // plan creates a job graph for given output references func plan(outputRefs []string) (*JobGraph, error) { log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs) unhandledRefs := make(map[string]bool) for _, ref := range outputRefs { unhandledRefs[ref] = true } epoch := 0 var nodes []Task // Determine the number of workers based on available CPU cores or environment variable numWorkers := runtime.NumCPU() if workerEnv := os.Getenv("DATABUILD_PARALLEL_WORKERS"); workerEnv != "" { if parsedWorkers, err := strconv.Atoi(workerEnv); err != nil { log.Printf("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '%s', using default: %d", workerEnv, numWorkers) } else if parsedWorkers < 1 { numWorkers = 1 log.Printf("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: %d", numWorkers) } else { numWorkers = parsedWorkers } } log.Printf("Using %d workers for parallel execution", numWorkers) for len(unhandledRefs) > 0 { if epoch >= 1000 { log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch) return nil, fmt.Errorf("Still planning after %d epochs, giving up", epoch) } log.Printf("Planning epoch %d with %d unhandled refs", epoch, len(unhandledRefs)) // Resolve jobs for all unhandled refs unhandledRefsList := make([]string, 0, len(unhandledRefs)) for ref := range unhandledRefs { unhandledRefsList = append(unhandledRefsList, ref) } jobRefs, err := resolve(unhandledRefsList) if err != nil { return nil, err } // Configure jobs in parallel newNodes, err := configureParallel(jobRefs, numWorkers) if err != nil { return nil, err } // Remove handled refs for _, producedRefs := range jobRefs { for _, ref := range producedRefs { delete(unhandledRefs, ref) } } if len(unhandledRefs) > 0 { log.Printf("Error: Still have unhandled refs after configuration phase: %v", unhandledRefs) return nil, fmt.Errorf("Should have no unhandled refs after configuration phase, but had: %v", unhandledRefs) } epoch++ // Add new nodes to the graph nodes = append(nodes, newNodes...) log.Printf("Planning epoch %d completed: added %d new nodes, total nodes: %d", epoch, len(newNodes), len(nodes)) // Plan next epoch newUnhandledCount := 0 for _, task := range newNodes { for _, input := range task.Config.Inputs { if input.DepType == Materialize { if !unhandledRefs[input.Ref] { newUnhandledCount++ } unhandledRefs[input.Ref] = true } } } if newUnhandledCount > 0 { log.Printf("Added %d new unhandled refs for next planning epoch", newUnhandledCount) } } if len(nodes) > 0 { log.Printf("Planning complete: created graph with %d nodes for %d output refs", len(nodes), len(outputRefs)) return &JobGraph{ Outputs: outputRefs, Nodes: nodes, }, nil } else { log.Printf("Planning failed: no nodes created for output refs %v", outputRefs) return nil, fmt.Errorf("Unknown failure in graph planning") } } // generateMermaidDiagram generates a Mermaid flowchart diagram from a job graph func generateMermaidDiagram(graph *JobGraph) string { // Start the mermaid flowchart mermaid := "flowchart TD\n" // Track nodes we've already added to avoid duplicates addedNodes := make(map[string]bool) addedRefs := make(map[string]bool) // Map to track which refs are outputs (to highlight them) isOutputRef := make(map[string]bool) for _, ref := range graph.Outputs { isOutputRef[ref] = true } // Process each task in the graph for _, task := range graph.Nodes { // Create a unique ID for this job+outputs combination outputsKey := strings.Join(task.Config.Outputs, "_") jobNodeId := "job_" + strings.Replace(task.JobLabel, "//", "_", -1) jobNodeId = strings.Replace(jobNodeId, ":", "_", -1) jobNodeId = jobNodeId + "_" + strings.Replace(outputsKey, "/", "_", -1) // Create a descriptive label that includes both job label and outputs jobLabel := task.JobLabel outputsLabel := "" if len(task.Config.Outputs) > 0 { if len(task.Config.Outputs) == 1 { outputsLabel = " [" + task.Config.Outputs[0] + "]" } else { outputsLabel = " [" + task.Config.Outputs[0] + ", ...]" } } // Add the job node if not already added if !addedNodes[jobNodeId] { // Represent job as a process shape with escaped label mermaid += fmt.Sprintf(" %s[\"`**%s** %s`\"]:::job\n", jobNodeId, jobLabel, outputsLabel) addedNodes[jobNodeId] = true } // Process inputs (dependencies) for _, input := range task.Config.Inputs { refNodeId := "ref_" + strings.Replace(input.Ref, "/", "_", -1) // Add the partition ref node if not already added if !addedRefs[refNodeId] { node_class := "partition" // Apply output styling immediately if this is an output ref if isOutputRef[input.Ref] { //mermaid += fmt.Sprintf(" class %s outputPartition;\n", refNodeId) node_class = "outputPartition" } // Represent partition as a cylinder mermaid += fmt.Sprintf(" %s[(\"%s\")]:::%s\n", refNodeId, input.Ref, node_class, ) addedRefs[refNodeId] = true } // Add the edge from input to job if input.DepType == Materialize { // Solid line for materialize dependencies mermaid += fmt.Sprintf(" %s --> %s\n", refNodeId, jobNodeId) } else { // Dashed line for query dependencies mermaid += fmt.Sprintf(" %s -.-> %s\n", refNodeId, jobNodeId) } } // Process outputs for _, output := range task.Config.Outputs { refNodeId := "ref_" + strings.Replace(output, "/", "_", -1) // Add the partition ref node if not already added if !addedRefs[refNodeId] { node_class := "partition" // Apply output styling immediately if this is an output ref if isOutputRef[output] { //mermaid += fmt.Sprintf(" class %s outputPartition;\n", refNodeId) node_class = "outputPartition" } // Represent partition as a cylinder mermaid += fmt.Sprintf(" %s[(\"Partition: %s\")]:::%s\n", refNodeId, output, node_class) addedRefs[refNodeId] = true } // Add the edge from job to output mermaid += fmt.Sprintf(" %s --> %s\n", jobNodeId, refNodeId) } } // Add styling mermaid += "\n %% Styling\n" mermaid += " classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n" mermaid += " classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n" mermaid += " classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n" return mermaid } func main() { mode := os.Getenv("DATABUILD_MODE") log.Printf("Starting analyze.go in mode: %s", mode) if mode == "plan" { // Get output refs from command line arguments outputRefs := os.Args[1:] graph, err := plan(outputRefs) if err != nil { fmt.Fprintf(os.Stderr, "Error: %s\n", err) os.Exit(1) } // Output the job graph as JSON jsonData, err := json.Marshal(graph) if err != nil { log.Printf("Error marshaling job graph: %s", err) fmt.Fprintf(os.Stderr, "Error marshaling job graph: %s\n", err) os.Exit(1) } log.Printf("Successfully generated job graph with %d nodes", len(graph.Nodes)) fmt.Println(string(jsonData)) } else if mode == "lookup" { // Get output refs from command line arguments outputRefs := os.Args[1:] result, err := resolve(outputRefs) if err != nil { fmt.Fprintf(os.Stderr, "Error: %s\n", err) os.Exit(1) } // Output the result as JSON jsonData, err := json.Marshal(result) if err != nil { log.Printf("Error marshaling lookup result: %s", err) fmt.Fprintf(os.Stderr, "Error marshaling lookup result: %s\n", err) os.Exit(1) } log.Printf("Successfully completed lookup for %d output refs with %d job mappings", len(outputRefs), len(result)) fmt.Println(string(jsonData)) } else if mode == "mermaid" { // Get output refs from command line arguments outputRefs := os.Args[1:] graph, err := plan(outputRefs) if err != nil { fmt.Fprintf(os.Stderr, "Error: %s\n", err) os.Exit(1) } // Generate and output the mermaid diagram mermaidDiagram := generateMermaidDiagram(graph) fmt.Println(mermaidDiagram) log.Printf("Successfully generated mermaid diagram for %d nodes", len(graph.Nodes)) } else if mode == "import_test" { log.Printf("Running in import_test mode") fmt.Println("ok :)") log.Printf("Import test completed successfully") } else { log.Printf("Error: Unknown mode '%s'", mode) fmt.Fprintf(os.Stderr, "Unknown MODE `%s`\n", mode) os.Exit(1) } }