Parallel analyze

This commit is contained in:
Stuart Axelbrooke 2025-04-20 09:46:33 -07:00
parent 0cb2737701
commit 13fc7a7c63
No known key found for this signature in database
GPG key ID: 1B0A848C29D46A35
5 changed files with 248 additions and 385 deletions

View file

@ -1,9 +0,0 @@
directories:
./
targets:
//runtime/...
//:all
//job/...
//graph/...

2
.gitignore vendored
View file

@ -2,3 +2,5 @@ bazel-*
.ijwb .ijwb
databuild.iml databuild.iml
.idea .idea
.DS_Store
examples/podcast_reviews/data

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,6 @@
# Podcast Reviews Example
## Input Data
Get it from [here](https://www.kaggle.com/datasets/thoughtvector/podcastreviews/versions/28?select=database.sqlite)! (and put it in `examples/podcast_reviews/data/ingest/database.sqlite`)

View file

@ -7,6 +7,9 @@ import (
"os/exec" "os/exec"
"strings" "strings"
"log" "log"
"sync"
"runtime"
"strconv"
) )
// DataDepType represents the type of data dependency // DataDepType represents the type of data dependency
@ -146,6 +149,83 @@ func resolve(outputRefs []string) (map[string][]string, error) {
return result, nil return result, nil
} }
// configureParallel configures multiple jobs in parallel
func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, error) {
var wg sync.WaitGroup
tasksChan := make(chan []Task, len(jobRefs))
errorChan := make(chan error, len(jobRefs))
jobsChan := make(chan struct {
jobLabel string
producedRefs []string
}, len(jobRefs))
// Use a mutex to protect access to the error variable
var mu sync.Mutex
var firstErr error
// Fill the jobs channel
for jobLabel, producedRefs := range jobRefs {
jobsChan <- struct {
jobLabel string
producedRefs []string
}{jobLabel, producedRefs}
}
close(jobsChan)
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobsChan {
// Check if an error has already occurred
mu.Lock()
if firstErr != nil {
mu.Unlock()
return
}
mu.Unlock()
tasks, err := configure(job.jobLabel, job.producedRefs)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err
errorChan <- err
}
mu.Unlock()
return
}
tasksChan <- tasks
}
}()
}
// Wait for all workers to finish
go func() {
wg.Wait()
close(tasksChan)
close(errorChan)
}()
// Collect results
var allTasks []Task
for tasks := range tasksChan {
allTasks = append(allTasks, tasks...)
}
// Check for errors
select {
case err := <-errorChan:
if err != nil {
return nil, err
}
default:
}
return allTasks, nil
}
// plan creates a job graph for given output references // plan creates a job graph for given output references
func plan(outputRefs []string) (*JobGraph, error) { func plan(outputRefs []string) (*JobGraph, error) {
log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs) log.Printf("Starting planning for %d output refs: %v", len(outputRefs), outputRefs)
@ -156,6 +236,20 @@ func plan(outputRefs []string) (*JobGraph, error) {
epoch := 0 epoch := 0
var nodes []Task var nodes []Task
// Determine the number of workers based on available CPU cores or environment variable
numWorkers := runtime.NumCPU()
if workerEnv := os.Getenv("DATABUILD_PARALLEL_WORKERS"); workerEnv != "" {
if parsedWorkers, err := strconv.Atoi(workerEnv); err != nil {
log.Printf("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '%s', using default: %d", workerEnv, numWorkers)
} else if parsedWorkers < 1 {
numWorkers = 1
log.Printf("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: %d", numWorkers)
} else {
numWorkers = parsedWorkers
}
}
log.Printf("Using %d workers for parallel execution", numWorkers)
for len(unhandledRefs) > 0 { for len(unhandledRefs) > 0 {
if epoch >= 1000 { if epoch >= 1000 {
log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch) log.Printf("Planning timeout: still planning after %d epochs, giving up", epoch)
@ -173,16 +267,14 @@ func plan(outputRefs []string) (*JobGraph, error) {
return nil, err return nil, err
} }
// Configure jobs // Configure jobs in parallel
var newNodes []Task newNodes, err := configureParallel(jobRefs, numWorkers)
for jobLabel, producedRefs := range jobRefs { if err != nil {
tasks, err := configure(jobLabel, producedRefs) return nil, err
if err != nil { }
return nil, err
}
newNodes = append(newNodes, tasks...)
// Remove handled refs // Remove handled refs
for _, producedRefs := range jobRefs {
for _, ref := range producedRefs { for _, ref := range producedRefs {
delete(unhandledRefs, ref) delete(unhandledRefs, ref)
} }