Remove go analyzer

This commit is contained in:
Stuart Axelbrooke 2025-05-07 16:47:59 -07:00
parent 2cbd69f935
commit f5eeeeb3dd
No known key found for this signature in database
GPG key ID: 1B0A848C29D46A35
6 changed files with 25 additions and 516 deletions

View file

@ -15,7 +15,7 @@ crate = use_extension("@rules_rust//crate_universe:extensions.bzl", "crate")
crate.spec(package = "serde", features = ["derive"], version = "1.0")
crate.spec(package = "serde_json", version = "1.0")
crate.spec(package = "log", version = "0.4")
crate.spec(package = "simple_logger", version = "4.3")
crate.spec(package = "simple_logger", features=["stderr"], version = "4.3")
crate.spec(package = "crossbeam-channel", version = "0.5")
crate.spec(package = "num_cpus", version = "1.16")
crate.spec(package = "tokio", default_features = False, features = ["macros", "net", "rt-multi-thread"], version = "1.38")

File diff suppressed because one or more lines are too long

View file

@ -24,3 +24,13 @@ sh_test(
"//:basic_graph.analyze",
],
)
sh_test(
name = "exec_test",
srcs = ["exec_test.sh"],
data = [
"//:basic_graph.exec",
"//:basic_graph.build",
"//:basic_graph.analyze",
],
)

0
examples/basic_graph/test/exec_test.sh Normal file → Executable file
View file

View file

@ -7,12 +7,6 @@ exports_files([
"rust_analyze_wrapper.sh.tpl",
])
go_binary(
name = "analyze_go",
srcs = ["analyze.go"],
visibility = ["//visibility:public"],
)
go_binary(
name = "execute",
srcs = ["execute.go"],

View file

@ -1,495 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"log"
"sync"
"runtime"
"strconv"
)
// DataDepType represents the type of data dependency
type DataDepType string
const (
Query DataDepType = "query"
Materialize DataDepType = "materialize"
)
// DataDep represents a data dependency
type DataDep struct {
DepType DataDepType `json:"depType"`
Ref string `json:"ref"`
}
// JobConfig represents the configuration for a job
type JobConfig struct {
Inputs []DataDep `json:"inputs"`
Outputs []string `json:"outputs"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
}
// Task represents a job task
type Task struct {
JobLabel string `json:"jobLabel"`
Config JobConfig `json:"config"`
}
// JobGraph represents a graph of jobs
type JobGraph struct {
Outputs []string `json:"outputs"`
Nodes []Task `json:"nodes"`
}
// PartitionManifest represents a partition manifest
type PartitionManifest struct {
Outputs string `json:"outputs"`
Inputs []DataDep `json:"inputs"`
StartTime int `json:"startTime"`
EndTime int `json:"endTime"`
Config JobConfig `json:"config"`
}
// jobLabelToCfgPath converts a job label to a configuration path
func jobLabelToCfgPath(jobLabel string) string {
return "." + strings.Replace(strings.Replace(jobLabel, "//", "", -1), ":", "/", -1) + ".cfg"
}
// configure configures the specified job to produce the desired outputs
func configure(jobLabel string, outputRefs []string) ([]Task, error) {
candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS")
var jobPathMap map[string]string
if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil {
return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err)
}
// Look up the executable path for this job
execPath, ok := jobPathMap[jobLabel]
if !ok {
return nil, fmt.Errorf("job %s is not a candidate job", jobLabel)
}
// Check if executable exists
if _, err := os.Stat(execPath); err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("executable not found at path: %s", execPath)
}
return nil, fmt.Errorf("error checking executable: %v", err)
}
log.Printf("Executing job configuration: %s %v", execPath, outputRefs)
cmd := exec.Command(execPath, outputRefs...)
var stdout, stderr strings.Builder
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Printf("Job configuration failed: %s", stderr.String())
return nil, fmt.Errorf("Failed to run job config: %s", stderr.String())
}
log.Printf("Job configuration succeeded for %s", jobLabel)
// Parse the job configurations
var jobConfigs []JobConfig
err = json.Unmarshal([]byte(stdout.String()), &jobConfigs)
if err != nil {
log.Printf("Error parsing job configs for %s: %s. `%s`", jobLabel, err, stdout.String())
return nil, fmt.Errorf("Failed to parse job configs: %s", err)
}
// Create tasks
tasks := make([]Task, len(jobConfigs))
for i, cfg := range jobConfigs {
tasks[i] = Task{
JobLabel: jobLabel,
Config: cfg,
}
}
log.Printf("Created %d tasks for job %s", len(tasks), jobLabel)
return tasks, nil
}
// resolve produces a mapping of required job refs to the partitions it produces
func resolve(outputRefs []string) (map[string][]string, error) {
lookupPath := os.Getenv("DATABUILD_JOB_LOOKUP_PATH")
// Run the job lookup
log.Printf("Executing job lookup: %s %v", lookupPath, outputRefs)
cmd := exec.Command(lookupPath, outputRefs...)
var stdout, stderr strings.Builder
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
log.Printf("Job lookup failed: %s", stderr.String())
return nil, fmt.Errorf("Failed to run job lookup: %s", stderr.String())
}
log.Printf("Job lookup succeeded for %d output refs", len(outputRefs))
// Parse the result
var result map[string][]string
err = json.Unmarshal([]byte(stdout.String()), &result)
if err != nil {
log.Printf("Error parsing job lookup result: %s", err)
return nil, fmt.Errorf("Failed to parse job lookup result: %s", err)
}
log.Printf("Job lookup found %d job mappings", len(result))
for job, refs := range result {
log.Printf(" Job %s produces %d refs", job, len(refs))
}
return result, nil
}
// configureParallel configures multiple jobs in parallel
func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, error) {
var wg sync.WaitGroup
tasksChan := make(chan []Task, len(jobRefs))
errorChan := make(chan error, len(jobRefs))
jobsChan := make(chan struct {
jobLabel string
producedRefs []string
}, len(jobRefs))
// Use a mutex to protect access to the error variable
var mu sync.Mutex
var firstErr error
// Fill the jobs channel
for jobLabel, producedRefs := range jobRefs {
jobsChan <- struct {
jobLabel string
producedRefs []string
}{jobLabel, producedRefs}
}
close(jobsChan)
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobsChan {
// Check if an error has already occurred
mu.Lock()
if firstErr != nil {
mu.Unlock()
return
}
mu.Unlock()
tasks, err := configure(job.jobLabel, job.producedRefs)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
errorChan <- err
}
mu.Unlock()
return
}
tasksChan <- tasks
}
}()
}
// Wait for all workers to finish
go func() {
wg.Wait()
close(tasksChan)
close(errorChan)
}()
// Collect results
var allTasks []Task
for tasks := range tasksChan {
allTasks = append(allTasks, tasks...)
}
// Check for errors
select {
case err := <-errorChan:
if err != nil {
return nil, err
}
default:
}
return allTasks, nil
}
// plan creates a job graph for given output references
func plan(outputRefs []string) (*JobGraph, error) {
log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs)
unhandledRefs := make(map[string]bool)
for _, ref := range outputRefs {
unhandledRefs[ref] = true
}
epoch := 0
var nodes []Task
// Determine the number of workers based on available CPU cores or environment variable
numWorkers := runtime.NumCPU()
if workerEnv := os.Getenv("DATABUILD_PARALLEL_WORKERS"); workerEnv != "" {
if parsedWorkers, err := strconv.Atoi(workerEnv); err != nil {
log.Printf("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '%s', using default: %d", workerEnv, numWorkers)
} else if parsedWorkers < 1 {
numWorkers = 1
log.Printf("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: %d", numWorkers)
} else {
numWorkers = parsedWorkers
}
}
log.Printf("Using %d workers for parallel execution", numWorkers)
for len(unhandledRefs) > 0 {
if epoch >= 1000 {
log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch)
return nil, fmt.Errorf("Still planning after %d epochs, giving up", epoch)
}
log.Printf("Planning epoch %d with %d unhandled refs", epoch, len(unhandledRefs))
// Resolve jobs for all unhandled refs
unhandledRefsList := make([]string, 0, len(unhandledRefs))
for ref := range unhandledRefs {
unhandledRefsList = append(unhandledRefsList, ref)
}
jobRefs, err := resolve(unhandledRefsList)
if err != nil {
return nil, err
}
// Configure jobs in parallel
newNodes, err := configureParallel(jobRefs, numWorkers)
if err != nil {
return nil, err
}
// Remove handled refs
for _, producedRefs := range jobRefs {
for _, ref := range producedRefs {
delete(unhandledRefs, ref)
}
}
if len(unhandledRefs) > 0 {
log.Printf("Error: Still have unhandled refs after configuration phase: %v", unhandledRefs)
return nil, fmt.Errorf("Should have no unhandled refs after configuration phase, but had: %v", unhandledRefs)
}
epoch++
// Add new nodes to the graph
nodes = append(nodes, newNodes...)
log.Printf("Planning epoch %d completed: added %d new nodes, total nodes: %d", epoch, len(newNodes), len(nodes))
// Plan next epoch
newUnhandledCount := 0
for _, task := range newNodes {
for _, input := range task.Config.Inputs {
if input.DepType == Materialize {
if !unhandledRefs[input.Ref] {
newUnhandledCount++
}
unhandledRefs[input.Ref] = true
}
}
}
if newUnhandledCount > 0 {
log.Printf("Added %d new unhandled refs for next planning epoch", newUnhandledCount)
}
}
if len(nodes) > 0 {
log.Printf("Planning complete: created graph with %d nodes for %d output refs", len(nodes), len(outputRefs))
return &JobGraph{
Outputs: outputRefs,
Nodes: nodes,
}, nil
} else {
log.Printf("Planning failed: no nodes created for output refs %v", outputRefs)
return nil, fmt.Errorf("Unknown failure in graph planning")
}
}
// generateMermaidDiagram generates a Mermaid flowchart diagram from a job graph
func generateMermaidDiagram(graph *JobGraph) string {
// Start the mermaid flowchart
mermaid := "flowchart TD\n"
// Track nodes we've already added to avoid duplicates
addedNodes := make(map[string]bool)
addedRefs := make(map[string]bool)
// Map to track which refs are outputs (to highlight them)
isOutputRef := make(map[string]bool)
for _, ref := range graph.Outputs {
isOutputRef[ref] = true
}
// Process each task in the graph
for _, task := range graph.Nodes {
// Create a unique ID for this job+outputs combination
outputsKey := strings.Join(task.Config.Outputs, "_")
jobNodeId := "job_" + strings.Replace(task.JobLabel, "//", "_", -1)
jobNodeId = strings.Replace(jobNodeId, ":", "_", -1)
jobNodeId = jobNodeId + "_" + strings.Replace(outputsKey, "/", "_", -1)
// Create a descriptive label that includes both job label and outputs
jobLabel := task.JobLabel
outputsLabel := ""
if len(task.Config.Outputs) > 0 {
if len(task.Config.Outputs) == 1 {
outputsLabel = " [" + task.Config.Outputs[0] + "]"
} else {
outputsLabel = " [" + task.Config.Outputs[0] + ", ...]"
}
}
// Add the job node if not already added
if !addedNodes[jobNodeId] {
// Represent job as a process shape with escaped label
mermaid += fmt.Sprintf(" %s[\"`**%s** %s`\"]:::job\n",
jobNodeId,
jobLabel,
outputsLabel)
addedNodes[jobNodeId] = true
}
// Process inputs (dependencies)
for _, input := range task.Config.Inputs {
refNodeId := "ref_" + strings.Replace(input.Ref, "/", "_", -1)
// Add the partition ref node if not already added
if !addedRefs[refNodeId] {
node_class := "partition"
// Apply output styling immediately if this is an output ref
if isOutputRef[input.Ref] {
//mermaid += fmt.Sprintf(" class %s outputPartition;\n", refNodeId)
node_class = "outputPartition"
}
// Represent partition as a cylinder
mermaid += fmt.Sprintf(" %s[(\"%s\")]:::%s\n",
refNodeId,
input.Ref,
node_class,
)
addedRefs[refNodeId] = true
}
// Add the edge from input to job
if input.DepType == Materialize {
// Solid line for materialize dependencies
mermaid += fmt.Sprintf(" %s --> %s\n", refNodeId, jobNodeId)
} else {
// Dashed line for query dependencies
mermaid += fmt.Sprintf(" %s -.-> %s\n", refNodeId, jobNodeId)
}
}
// Process outputs
for _, output := range task.Config.Outputs {
refNodeId := "ref_" + strings.Replace(output, "/", "_", -1)
// Add the partition ref node if not already added
if !addedRefs[refNodeId] {
node_class := "partition"
// Apply output styling immediately if this is an output ref
if isOutputRef[output] {
//mermaid += fmt.Sprintf(" class %s outputPartition;\n", refNodeId)
node_class = "outputPartition"
}
// Represent partition as a cylinder
mermaid += fmt.Sprintf(" %s[(\"Partition: %s\")]:::%s\n",
refNodeId,
output,
node_class)
addedRefs[refNodeId] = true
}
// Add the edge from job to output
mermaid += fmt.Sprintf(" %s --> %s\n", jobNodeId, refNodeId)
}
}
// Add styling
mermaid += "\n %% Styling\n"
mermaid += " classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n"
mermaid += " classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n"
mermaid += " classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n"
return mermaid
}
func main() {
mode := os.Getenv("DATABUILD_MODE")
log.Printf("Starting analyze.go in mode: %s", mode)
if mode == "plan" {
// Get output refs from command line arguments
outputRefs := os.Args[1:]
graph, err := plan(outputRefs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Output the job graph as JSON
jsonData, err := json.Marshal(graph)
if err != nil {
log.Printf("Error marshaling job graph: %s", err)
fmt.Fprintf(os.Stderr, "Error marshaling job graph: %s\n", err)
os.Exit(1)
}
log.Printf("Successfully generated job graph with %d nodes", len(graph.Nodes))
fmt.Println(string(jsonData))
} else if mode == "lookup" {
// Get output refs from command line arguments
outputRefs := os.Args[1:]
result, err := resolve(outputRefs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Output the result as JSON
jsonData, err := json.Marshal(result)
if err != nil {
log.Printf("Error marshaling lookup result: %s", err)
fmt.Fprintf(os.Stderr, "Error marshaling lookup result: %s\n", err)
os.Exit(1)
}
log.Printf("Successfully completed lookup for %d output refs with %d job mappings", len(outputRefs), len(result))
fmt.Println(string(jsonData))
} else if mode == "mermaid" {
// Get output refs from command line arguments
outputRefs := os.Args[1:]
graph, err := plan(outputRefs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
os.Exit(1)
}
// Generate and output the mermaid diagram
mermaidDiagram := generateMermaidDiagram(graph)
fmt.Println(mermaidDiagram)
log.Printf("Successfully generated mermaid diagram for %d nodes", len(graph.Nodes))
} else if mode == "import_test" {
log.Printf("Running in import_test mode")
fmt.Println("ok :)")
log.Printf("Import test completed successfully")
} else {
log.Printf("Error: Unknown mode '%s'", mode)
fmt.Fprintf(os.Stderr, "Unknown MODE `%s`\n", mode)
os.Exit(1)
}
}