Compare commits
No commits in common. "bef37cd8ab8c1ee7105be90e23e725fcc508d5f0" and "3a9fd6a800b0e67c5295a83095b8bef32a3e5987" have entirely different histories.
bef37cd8ab
...
3a9fd6a800
31 changed files with 153 additions and 1174 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -16,6 +16,3 @@ generated_number
|
||||||
target
|
target
|
||||||
logs/databuild/
|
logs/databuild/
|
||||||
**/logs/databuild/
|
**/logs/databuild/
|
||||||
|
|
||||||
# DSL generated code
|
|
||||||
**/generated/
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ use databuild::repositories::{
|
||||||
builds::BuildsRepository
|
builds::BuildsRepository
|
||||||
};
|
};
|
||||||
use clap::{Arg, Command as ClapCommand, ArgMatches};
|
use clap::{Arg, Command as ClapCommand, ArgMatches};
|
||||||
use log::{info, error};
|
use log::info;
|
||||||
use simple_logger::SimpleLogger;
|
use simple_logger::SimpleLogger;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
|
|
@ -26,21 +26,26 @@ async fn run_analysis(
|
||||||
info!("Running analysis for partitions: {:?}", partitions);
|
info!("Running analysis for partitions: {:?}", partitions);
|
||||||
|
|
||||||
// Get required environment variables
|
// Get required environment variables
|
||||||
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
|
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS_CFG not set".to_string()))?;
|
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS not set".to_string()))?;
|
||||||
let job_lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
|
let job_lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_JOB_LOOKUP_PATH not set".to_string()))?;
|
.map_err(|_| CliError::Environment("DATABUILD_JOB_LOOKUP_PATH not set".to_string()))?;
|
||||||
let graph_label = env::var("DATABUILD_GRAPH_LABEL")
|
let graph_label = env::var("DATABUILD_GRAPH_LABEL")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_GRAPH_LABEL not set".to_string()))?;
|
.map_err(|_| CliError::Environment("DATABUILD_GRAPH_LABEL not set".to_string()))?;
|
||||||
|
|
||||||
// Find analyze binary using runfiles
|
// Find analyze binary using runfiles
|
||||||
let analyze_path = env::var("DATABUILD_ANALYZE_BINARY")
|
let analyze_path = env::var("RUNFILES_DIR")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_ANALYZE_BINARY not set".to_string()))?;
|
.map(|runfiles_dir| format!("{}/databuild+/databuild/graph/analyze", runfiles_dir))
|
||||||
|
.or_else(|_| {
|
||||||
|
// Fallback for direct execution
|
||||||
|
Ok("./databuild/graph/analyze".to_string())
|
||||||
|
})
|
||||||
|
.map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate analyze binary: {}", e)))?;
|
||||||
|
|
||||||
// Build analyze command
|
// Build analyze command
|
||||||
let cmd = Command::new(analyze_path)
|
let cmd = Command::new(analyze_path)
|
||||||
.args(partitions)
|
.args(partitions)
|
||||||
.env("DATABUILD_CANDIDATE_JOBS_CFG", candidate_jobs)
|
.env("DATABUILD_CANDIDATE_JOBS", candidate_jobs)
|
||||||
.env("DATABUILD_JOB_LOOKUP_PATH", job_lookup_path)
|
.env("DATABUILD_JOB_LOOKUP_PATH", job_lookup_path)
|
||||||
.env("DATABUILD_GRAPH_LABEL", graph_label)
|
.env("DATABUILD_GRAPH_LABEL", graph_label)
|
||||||
.env("DATABUILD_MODE", "plan")
|
.env("DATABUILD_MODE", "plan")
|
||||||
|
|
@ -78,17 +83,22 @@ async fn run_execution(
|
||||||
.map_err(|e| CliError::Execution(format!("Failed to serialize job graph: {}", e)))?;
|
.map_err(|e| CliError::Execution(format!("Failed to serialize job graph: {}", e)))?;
|
||||||
|
|
||||||
// Get required environment variables
|
// Get required environment variables
|
||||||
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
|
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS_CFG not set".to_string()))?;
|
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS not set".to_string()))?;
|
||||||
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").unwrap_or_else(|_| "stdout".to_string());
|
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").unwrap_or_else(|_| "stdout".to_string());
|
||||||
|
|
||||||
// Find execute binary using runfiles
|
// Find execute binary using runfiles
|
||||||
let execute_path = env::var("DATABUILD_EXECUTE_BINARY")
|
let execute_path = env::var("RUNFILES_DIR")
|
||||||
.map_err(|_| CliError::Environment("DATABUILD_EXECUTE_BINARY not set".to_string()))?;
|
.map(|runfiles_dir| format!("{}/databuild+/databuild/graph/execute", runfiles_dir))
|
||||||
|
.or_else(|_| {
|
||||||
|
// Fallback for direct execution
|
||||||
|
Ok("./databuild/graph/execute".to_string())
|
||||||
|
})
|
||||||
|
.map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate execute binary: {}", e)))?;
|
||||||
|
|
||||||
// Build execute command
|
// Build execute command
|
||||||
let mut cmd = Command::new(execute_path)
|
let mut cmd = Command::new(execute_path)
|
||||||
.env("DATABUILD_CANDIDATE_JOBS_CFG", candidate_jobs)
|
.env("DATABUILD_CANDIDATE_JOBS", candidate_jobs)
|
||||||
.env("DATABUILD_BUILD_EVENT_LOG", build_event_log_uri)
|
.env("DATABUILD_BUILD_EVENT_LOG", build_event_log_uri)
|
||||||
.env("DATABUILD_BUILD_REQUEST_ID", orchestrator.build_request_id())
|
.env("DATABUILD_BUILD_REQUEST_ID", orchestrator.build_request_id())
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
|
|
@ -109,8 +119,7 @@ async fn run_execution(
|
||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
error!("Execution failed:\n{}", stderr);
|
return Err(CliError::Execution(format!("Execution failed: {}", stderr)));
|
||||||
return Err(CliError::Execution("Execution failed".to_string()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For now, assume success if the command completed without error
|
// For now, assume success if the command completed without error
|
||||||
|
|
|
||||||
|
|
@ -6,17 +6,3 @@ py_library(
|
||||||
"//databuild:py_proto",
|
"//databuild:py_proto",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
py_binary(
|
|
||||||
name = "generator",
|
|
||||||
srcs = ["generator.py"],
|
|
||||||
main = "generator.py",
|
|
||||||
data = ["dsl_job_wrapper.py"],
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
deps = [
|
|
||||||
":dsl",
|
|
||||||
"//databuild:py_proto",
|
|
||||||
# Include test app modules for testing
|
|
||||||
"//databuild/test/app/dsl:dsl_src",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ class DataBuildJob(Protocol):
|
||||||
|
|
||||||
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
||||||
|
|
||||||
def exec(self, *args: str) -> None: ...
|
def exec(self, config: JobConfig) -> None: ...
|
||||||
|
|
||||||
|
|
||||||
class DataBuildGraph:
|
class DataBuildGraph:
|
||||||
|
|
@ -84,291 +84,6 @@ class DataBuildGraph:
|
||||||
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
|
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def generate_bazel_package(self, name: str, output_dir: str) -> None:
|
|
||||||
"""Generate BUILD.bazel and binaries into a generated/ subdirectory.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: Base name for the generated graph (without .generate suffix)
|
|
||||||
output_dir: Directory to write generated files to (will create generated/ subdir)
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
# Create generated/ subdirectory
|
|
||||||
generated_dir = os.path.join(output_dir, "generated")
|
|
||||||
os.makedirs(generated_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Generate BUILD.bazel with job and graph targets
|
|
||||||
self._generate_build_bazel(generated_dir, name)
|
|
||||||
|
|
||||||
# Generate individual job scripts (instead of shared wrapper)
|
|
||||||
self._generate_job_scripts(generated_dir)
|
|
||||||
|
|
||||||
# Generate job lookup binary
|
|
||||||
self._generate_job_lookup(generated_dir, name)
|
|
||||||
|
|
||||||
package_name = self._get_package_name()
|
|
||||||
print(f"Generated DataBuild package '{name}' in {generated_dir}")
|
|
||||||
if package_name != "UNKNOWN_PACKAGE":
|
|
||||||
print(f"Run 'bazel build //{package_name}/generated:{name}_graph.analyze' to use the generated graph")
|
|
||||||
else:
|
|
||||||
print(f"Run 'bazel build generated:{name}_graph.analyze' to use the generated graph")
|
|
||||||
|
|
||||||
def _generate_build_bazel(self, output_dir: str, name: str) -> None:
|
|
||||||
"""Generate BUILD.bazel with databuild_job and databuild_graph targets."""
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Get job classes from the lookup table
|
|
||||||
job_classes = list(set(self.lookup.values()))
|
|
||||||
|
|
||||||
# Get parent package for dependencies
|
|
||||||
parent_package = self._get_package_name()
|
|
||||||
|
|
||||||
# Generate py_binary targets for each job
|
|
||||||
job_binaries = []
|
|
||||||
job_targets = []
|
|
||||||
|
|
||||||
for job_class in job_classes:
|
|
||||||
job_name = self._snake_case(job_class.__name__)
|
|
||||||
binary_name = f"{job_name}_binary"
|
|
||||||
job_targets.append(f'"{job_name}"')
|
|
||||||
|
|
||||||
job_script_name = f"{job_name}.py"
|
|
||||||
job_binaries.append(f'''py_binary(
|
|
||||||
name = "{binary_name}",
|
|
||||||
srcs = ["{job_script_name}"],
|
|
||||||
main = "{job_script_name}",
|
|
||||||
deps = ["//{parent_package}:dsl_src"],
|
|
||||||
)
|
|
||||||
|
|
||||||
databuild_job(
|
|
||||||
name = "{job_name}",
|
|
||||||
binary = ":{binary_name}",
|
|
||||||
)''')
|
|
||||||
|
|
||||||
# Generate the complete BUILD.bazel content
|
|
||||||
build_content = f'''load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
|
|
||||||
|
|
||||||
# Generated by DataBuild DSL - do not edit manually
|
|
||||||
# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel
|
|
||||||
|
|
||||||
{chr(10).join(job_binaries)}
|
|
||||||
|
|
||||||
py_binary(
|
|
||||||
name = "{name}_job_lookup",
|
|
||||||
srcs = ["{name}_job_lookup.py"],
|
|
||||||
deps = ["//{parent_package}:dsl_src"],
|
|
||||||
)
|
|
||||||
|
|
||||||
databuild_graph(
|
|
||||||
name = "{name}_graph",
|
|
||||||
jobs = [{", ".join(job_targets)}],
|
|
||||||
lookup = ":{name}_job_lookup",
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
)
|
|
||||||
'''
|
|
||||||
|
|
||||||
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
|
|
||||||
f.write(build_content)
|
|
||||||
|
|
||||||
def _generate_job_scripts(self, output_dir: str) -> None:
|
|
||||||
"""Generate individual Python scripts for each job class."""
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Get job classes and generate a script for each one
|
|
||||||
job_classes = list(set(self.lookup.values()))
|
|
||||||
graph_module_path = self._get_graph_module_path()
|
|
||||||
|
|
||||||
for job_class in job_classes:
|
|
||||||
job_name = self._snake_case(job_class.__name__)
|
|
||||||
script_name = f"{job_name}.py"
|
|
||||||
|
|
||||||
script_content = f'''#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Generated job script for {job_class.__name__}.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
from {graph_module_path} import {job_class.__name__}
|
|
||||||
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
|
||||||
|
|
||||||
|
|
||||||
def parse_outputs_from_args(args: list[str]) -> list:
|
|
||||||
"""Parse partition output references from command line arguments."""
|
|
||||||
outputs = []
|
|
||||||
for arg in args:
|
|
||||||
# Find which output type can deserialize this partition reference
|
|
||||||
for output_type in {job_class.__name__}.output_types:
|
|
||||||
try:
|
|
||||||
partition = output_type.deserialize(arg)
|
|
||||||
outputs.append(partition)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {{arg}}")
|
|
||||||
|
|
||||||
return outputs
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
if len(sys.argv) < 2:
|
|
||||||
raise Exception(f"Invalid command usage")
|
|
||||||
|
|
||||||
command = sys.argv[1]
|
|
||||||
job_instance = {job_class.__name__}()
|
|
||||||
|
|
||||||
if command == "config":
|
|
||||||
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
|
||||||
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
|
||||||
|
|
||||||
# Also parse them into DSL partition objects (for DSL job.config())
|
|
||||||
outputs = parse_outputs_from_args(sys.argv[2:])
|
|
||||||
|
|
||||||
# Call job's config method - returns list[JobConfig]
|
|
||||||
configs = job_instance.config(outputs)
|
|
||||||
|
|
||||||
# Wrap in JobConfigureResponse and serialize using to_dict()
|
|
||||||
response = JobConfigureResponse(configs=configs)
|
|
||||||
print(json.dumps(to_dict(response)))
|
|
||||||
|
|
||||||
elif command == "exec":
|
|
||||||
# The exec method expects a JobConfig but the Rust wrapper passes args
|
|
||||||
# For now, let the DSL job handle the args directly
|
|
||||||
# TODO: This needs to be refined based on actual Rust wrapper interface
|
|
||||||
job_instance.exec(*sys.argv[2:])
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise Exception(f"Invalid command `{{sys.argv[1]}}`")
|
|
||||||
'''
|
|
||||||
|
|
||||||
script_path = os.path.join(output_dir, script_name)
|
|
||||||
with open(script_path, "w") as f:
|
|
||||||
f.write(script_content)
|
|
||||||
|
|
||||||
# Make it executable
|
|
||||||
os.chmod(script_path, 0o755)
|
|
||||||
|
|
||||||
def _generate_job_lookup(self, output_dir: str, name: str) -> None:
|
|
||||||
"""Generate job lookup binary that maps partition patterns to job targets."""
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Build the job lookup mappings with full package paths
|
|
||||||
package_name = self._get_package_name()
|
|
||||||
lookup_mappings = []
|
|
||||||
for partition_type, job_class in self.lookup.items():
|
|
||||||
job_name = self._snake_case(job_class.__name__)
|
|
||||||
pattern = partition_type._raw_pattern
|
|
||||||
full_target = f"//{package_name}/generated:{job_name}"
|
|
||||||
lookup_mappings.append(f' r"{pattern}": "{full_target}",')
|
|
||||||
|
|
||||||
lookup_content = f'''#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Generated job lookup for DataBuild DSL graph.
|
|
||||||
Maps partition patterns to job targets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import re
|
|
||||||
import json
|
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
|
|
||||||
# Mapping from partition patterns to job targets
|
|
||||||
JOB_MAPPINGS = {{
|
|
||||||
{chr(10).join(lookup_mappings)}
|
|
||||||
}}
|
|
||||||
|
|
||||||
|
|
||||||
def lookup_job_for_partition(partition_ref: str) -> str:
|
|
||||||
"""Look up which job can build the given partition reference."""
|
|
||||||
for pattern, job_target in JOB_MAPPINGS.items():
|
|
||||||
if re.match(pattern, partition_ref):
|
|
||||||
return job_target
|
|
||||||
|
|
||||||
raise ValueError(f"No job found for partition: {{partition_ref}}")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) < 2:
|
|
||||||
print("Usage: job_lookup.py <partition_ref> [partition_ref...]", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
results = defaultdict(list)
|
|
||||||
try:
|
|
||||||
for partition_ref in sys.argv[1:]:
|
|
||||||
job_target = lookup_job_for_partition(partition_ref)
|
|
||||||
results[job_target].append(partition_ref)
|
|
||||||
|
|
||||||
# Output the results as JSON (matching existing lookup format)
|
|
||||||
print(json.dumps(dict(results)))
|
|
||||||
except ValueError as e:
|
|
||||||
print(f"ERROR: {{e}}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
'''
|
|
||||||
|
|
||||||
lookup_file = os.path.join(output_dir, f"{name}_job_lookup.py")
|
|
||||||
with open(lookup_file, "w") as f:
|
|
||||||
f.write(lookup_content)
|
|
||||||
|
|
||||||
# Make it executable
|
|
||||||
os.chmod(lookup_file, 0o755)
|
|
||||||
|
|
||||||
def _snake_case(self, name: str) -> str:
|
|
||||||
"""Convert CamelCase to snake_case."""
|
|
||||||
import re
|
|
||||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
|
||||||
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
|
||||||
|
|
||||||
def _get_graph_module_path(self) -> str:
|
|
||||||
"""Get the module path for the graph containing this instance."""
|
|
||||||
# Try to find the module by looking at where the graph object is defined
|
|
||||||
import inspect
|
|
||||||
import sys
|
|
||||||
|
|
||||||
# Look through all loaded modules to find where this graph instance is defined
|
|
||||||
for module_name, module in sys.modules.items():
|
|
||||||
if hasattr(module, 'graph') and getattr(module, 'graph') is self:
|
|
||||||
if module_name != '__main__':
|
|
||||||
return module_name
|
|
||||||
|
|
||||||
# Look through the call stack to find the module that imported us
|
|
||||||
for frame_info in inspect.stack():
|
|
||||||
frame_globals = frame_info.frame.f_globals
|
|
||||||
module_name = frame_globals.get('__name__')
|
|
||||||
if module_name and module_name != '__main__' and 'graph' in frame_globals:
|
|
||||||
# Check if this frame has our graph
|
|
||||||
if frame_globals.get('graph') is self:
|
|
||||||
return module_name
|
|
||||||
|
|
||||||
# Last resort fallback - this will need to be manually configured
|
|
||||||
return "UNKNOWN_MODULE"
|
|
||||||
|
|
||||||
def _get_package_name(self) -> str:
|
|
||||||
"""Get the Bazel package name where the DSL source files are located."""
|
|
||||||
# Extract package from the graph label if available
|
|
||||||
if hasattr(self, 'label') and self.label.startswith('//'):
|
|
||||||
# Extract package from label like "//databuild/test/app:dsl_graph"
|
|
||||||
package_part = self.label.split(':')[0]
|
|
||||||
return package_part[2:] # Remove "//" prefix
|
|
||||||
|
|
||||||
# Fallback to trying to infer from module path
|
|
||||||
module_path = self._get_graph_module_path()
|
|
||||||
if module_path != "UNKNOWN_MODULE":
|
|
||||||
# Convert module path to package path
|
|
||||||
# e.g., "databuild.test.app.dsl.graph" -> "databuild/test/app/dsl"
|
|
||||||
parts = module_path.split('.')
|
|
||||||
if parts[-1] in ['graph', 'main']:
|
|
||||||
parts = parts[:-1]
|
|
||||||
return '/'.join(parts)
|
|
||||||
|
|
||||||
return "UNKNOWN_PACKAGE"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class JobConfigBuilder:
|
class JobConfigBuilder:
|
||||||
|
|
|
||||||
|
|
@ -1,118 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph.
|
|
||||||
Configured via environment variables:
|
|
||||||
- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph')
|
|
||||||
- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes')
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import importlib
|
|
||||||
from typing import List, Any
|
|
||||||
from databuild.proto import JobConfig
|
|
||||||
|
|
||||||
|
|
||||||
def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]:
|
|
||||||
"""Parse partition output references from command line arguments into partition objects."""
|
|
||||||
outputs = []
|
|
||||||
for arg in args:
|
|
||||||
# Find which output type can deserialize this partition reference
|
|
||||||
for output_type in job_class.output_types:
|
|
||||||
try:
|
|
||||||
partition = output_type.deserialize(arg)
|
|
||||||
outputs.append(partition)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}")
|
|
||||||
|
|
||||||
return outputs
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) < 2:
|
|
||||||
print("Usage: dsl_job_wrapper.py <config|exec> [args...]", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
command = sys.argv[1]
|
|
||||||
|
|
||||||
# Read configuration from environment
|
|
||||||
graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE')
|
|
||||||
job_class_name = os.environ.get('DATABUILD_JOB_CLASS')
|
|
||||||
|
|
||||||
if not graph_module_path:
|
|
||||||
print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not job_class_name:
|
|
||||||
print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Import the graph module
|
|
||||||
module = importlib.import_module(graph_module_path)
|
|
||||||
graph = getattr(module, 'graph')
|
|
||||||
|
|
||||||
# Get the job class
|
|
||||||
job_class = getattr(module, job_class_name)
|
|
||||||
|
|
||||||
# Create job instance
|
|
||||||
job_instance = job_class()
|
|
||||||
|
|
||||||
except (ImportError, AttributeError) as e:
|
|
||||||
print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if command == "config":
|
|
||||||
try:
|
|
||||||
# Parse output partition references from remaining args
|
|
||||||
output_refs = sys.argv[2:]
|
|
||||||
if not output_refs:
|
|
||||||
print("ERROR: No output partition references provided", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
outputs = parse_outputs_from_args(output_refs, job_class)
|
|
||||||
|
|
||||||
# Call job's config method
|
|
||||||
configs = job_instance.config(outputs)
|
|
||||||
|
|
||||||
# Output each config as JSON (one per line for multiple configs)
|
|
||||||
for config in configs:
|
|
||||||
# Convert JobConfig to dict for JSON serialization
|
|
||||||
config_dict = {
|
|
||||||
'outputs': [{'str': ref.str} for ref in config.outputs],
|
|
||||||
'inputs': [
|
|
||||||
{
|
|
||||||
'dep_type_code': dep.dep_type_code,
|
|
||||||
'dep_type_name': dep.dep_type_name,
|
|
||||||
'partition_ref': {'str': dep.partition_ref.str}
|
|
||||||
} for dep in config.inputs
|
|
||||||
],
|
|
||||||
'args': config.args,
|
|
||||||
'env': config.env,
|
|
||||||
}
|
|
||||||
print(json.dumps(config_dict))
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Config failed: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
elif command == "exec":
|
|
||||||
try:
|
|
||||||
# Read config from stdin
|
|
||||||
job_instance.exec(*sys.argv[2:])
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Execution failed: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,47 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
DSL code generator that can be run as a py_binary with proper dependencies.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import importlib
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) != 4:
|
|
||||||
print("Usage: generator.py <module_path> <graph_attr> <output_dir>", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
module_path = sys.argv[1]
|
|
||||||
graph_attr = sys.argv[2]
|
|
||||||
output_dir = sys.argv[3]
|
|
||||||
|
|
||||||
# Extract the base name from the output directory for naming
|
|
||||||
name = os.path.basename(output_dir.rstrip('/')) or "graph"
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Import the graph module
|
|
||||||
module = importlib.import_module(module_path)
|
|
||||||
graph = getattr(module, graph_attr)
|
|
||||||
|
|
||||||
# Generate the bazel package
|
|
||||||
graph.generate_bazel_package(name, output_dir)
|
|
||||||
|
|
||||||
print(f"Generated DataBuild DSL package in {output_dir}")
|
|
||||||
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"ERROR: Failed to import {graph_attr} from {module_path}: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
except AttributeError as e:
|
|
||||||
print(f"ERROR: Module {module_path} does not have attribute {graph_attr}: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Generation failed: {e}", file=sys.stderr)
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc()
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -37,14 +37,6 @@ pub struct SqliteBuildEventLog {
|
||||||
|
|
||||||
impl SqliteBuildEventLog {
|
impl SqliteBuildEventLog {
|
||||||
pub async fn new(path: &str) -> Result<Self> {
|
pub async fn new(path: &str) -> Result<Self> {
|
||||||
// Create parent directory if it doesn't exist
|
|
||||||
if let Some(parent) = std::path::Path::new(path).parent() {
|
|
||||||
std::fs::create_dir_all(parent)
|
|
||||||
.map_err(|e| BuildEventLogError::ConnectionError(
|
|
||||||
format!("Failed to create directory {}: {}", parent.display(), e)
|
|
||||||
))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let conn = Connection::open(path)
|
let conn = Connection::open(path)
|
||||||
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
|
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,11 +13,11 @@ use databuild::mermaid_utils::generate_mermaid_diagram;
|
||||||
|
|
||||||
// Configure a job to produce the desired outputs
|
// Configure a job to produce the desired outputs
|
||||||
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
|
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
|
||||||
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
|
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS")
|
||||||
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS_CFG: {}", e))?;
|
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?;
|
||||||
|
|
||||||
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
|
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
|
||||||
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS_CFG: {}", e))?;
|
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?;
|
||||||
|
|
||||||
// Look up the executable path for this job
|
// Look up the executable path for this job
|
||||||
let exec_path = job_path_map.get(job_label)
|
let exec_path = job_path_map.get(job_label)
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ use std::io::{BufReader, Read, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{env, thread};
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
// Command line parsing removed - using environment variables
|
// Command line parsing removed - using environment variables
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
@ -52,6 +52,48 @@ fn get_task_key(task: &Task) -> String {
|
||||||
key_parts.join("|")
|
key_parts.join("|")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Resolves the executable path from runfiles.
|
||||||
|
// Mirrors the Go implementation's resolveExecutableFromRunfiles.
|
||||||
|
fn resolve_executable_from_runfiles(job_label: &str) -> PathBuf {
|
||||||
|
let mut target_name = job_label.to_string();
|
||||||
|
if let Some(colon_index) = job_label.rfind(':') {
|
||||||
|
target_name = job_label[colon_index + 1..].to_string();
|
||||||
|
} else if let Some(name) = Path::new(job_label).file_name().and_then(|n| n.to_str()) {
|
||||||
|
target_name = name.to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
let exec_name = format!("{}.exec", target_name);
|
||||||
|
|
||||||
|
if let Ok(runfiles_dir_str) = std::env::var("RUNFILES_DIR") {
|
||||||
|
let path = PathBuf::from(runfiles_dir_str).join("_main").join(&exec_name);
|
||||||
|
debug!("Resolved executable path (RUNFILES_DIR): {}", path.display());
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Ok(current_exe) = std::env::current_exe() {
|
||||||
|
let mut runfiles_dir_path = PathBuf::from(format!("{}.runfiles", current_exe.display()));
|
||||||
|
if !runfiles_dir_path.is_dir() { // Bazel often puts it next to the binary
|
||||||
|
if let Some(parent) = current_exe.parent() {
|
||||||
|
runfiles_dir_path = parent.join(format!("{}.runfiles", current_exe.file_name().unwrap_or_default().to_string_lossy()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if runfiles_dir_path.is_dir() {
|
||||||
|
let path = runfiles_dir_path.join("_main").join(&exec_name);
|
||||||
|
debug!("Resolved executable path (derived RUNFILES_DIR): {}", path.display());
|
||||||
|
return path;
|
||||||
|
} else {
|
||||||
|
warn!("Warning: RUNFILES_DIR not found or invalid, and derived path {} is not a directory.", runfiles_dir_path.display());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Warning: Could not determine current executable path.");
|
||||||
|
}
|
||||||
|
|
||||||
|
let fallback_path = PathBuf::from(format!("{}.exec", job_label));
|
||||||
|
warn!("Falling back to direct executable path: {}", fallback_path.display());
|
||||||
|
fallback_path
|
||||||
|
}
|
||||||
|
|
||||||
fn worker(
|
fn worker(
|
||||||
task_rx: Receiver<Arc<Task>>,
|
task_rx: Receiver<Arc<Task>>,
|
||||||
result_tx: Sender<TaskExecutionResult>,
|
result_tx: Sender<TaskExecutionResult>,
|
||||||
|
|
@ -63,17 +105,8 @@ fn worker(
|
||||||
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
|
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
|
||||||
let start_time = Instant::now();
|
let start_time = Instant::now();
|
||||||
|
|
||||||
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS_EXEC")
|
let exec_path = resolve_executable_from_runfiles(&task.job.as_ref().unwrap().label);
|
||||||
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
|
|
||||||
|
|
||||||
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
|
|
||||||
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
|
|
||||||
|
|
||||||
// Look up the executable path for this job
|
|
||||||
let job_label = &task.job.as_ref().unwrap().label;
|
|
||||||
let exec_path = job_path_map.get(job_label)
|
|
||||||
.ok_or_else(|| format!("Job {} is not a candidate job", job_label)).unwrap();
|
|
||||||
|
|
||||||
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
|
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
|
||||||
Ok(json) => json,
|
Ok(json) => json,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -96,8 +129,7 @@ fn worker(
|
||||||
|
|
||||||
// Generate a job run ID for this execution
|
// Generate a job run ID for this execution
|
||||||
let job_run_id = Uuid::new_v4().to_string();
|
let job_run_id = Uuid::new_v4().to_string();
|
||||||
|
|
||||||
info!("Running job {} (Path: {}) with config: {}", job_label, exec_path, config_json);
|
|
||||||
let mut cmd = Command::new(&exec_path);
|
let mut cmd = Command::new(&exec_path);
|
||||||
cmd.stdin(Stdio::piped())
|
cmd.stdin(Stdio::piped())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
|
|
|
||||||
|
|
@ -127,10 +127,9 @@ def _databuild_job_exec_impl(ctx):
|
||||||
# Create a simple script that calls the job wrapper with the original binary
|
# Create a simple script that calls the job wrapper with the original binary
|
||||||
script_content = RUNFILES_PREFIX + """
|
script_content = RUNFILES_PREFIX + """
|
||||||
export DATABUILD_JOB_BINARY="$(rlocation _main/{execute_path})"
|
export DATABUILD_JOB_BINARY="$(rlocation _main/{execute_path})"
|
||||||
"$(rlocation _main/{wrapper_path})" exec $@
|
exec "$(rlocation databuild+/databuild/job/job_wrapper)" exec "$@"
|
||||||
""".format(
|
""".format(
|
||||||
execute_path = ctx.attr.execute.files_to_run.executable.short_path,
|
execute_path = ctx.attr.execute.files_to_run.executable.short_path,
|
||||||
wrapper_path = ctx.attr._job_wrapper.files_to_run.executable.short_path,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx.actions.write(
|
ctx.actions.write(
|
||||||
|
|
@ -389,7 +388,7 @@ def _databuild_graph_analyze_impl(ctx):
|
||||||
]) + "'"
|
]) + "'"
|
||||||
|
|
||||||
env_setup = """
|
env_setup = """
|
||||||
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_job_env_var}"
|
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
|
||||||
export DATABUILD_MODE=plan
|
export DATABUILD_MODE=plan
|
||||||
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
||||||
""".format(
|
""".format(
|
||||||
|
|
@ -480,7 +479,7 @@ def _databuild_graph_mermaid_impl(ctx):
|
||||||
]) + "'"
|
]) + "'"
|
||||||
|
|
||||||
env_setup = """
|
env_setup = """
|
||||||
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_job_env_var}"
|
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
|
||||||
export DATABUILD_MODE=mermaid
|
export DATABUILD_MODE=mermaid
|
||||||
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
||||||
""".format(
|
""".format(
|
||||||
|
|
@ -634,89 +633,39 @@ def _databuild_graph_build_impl(ctx):
|
||||||
"""Wraps the DataBuild CLI wrapper in a shell script."""
|
"""Wraps the DataBuild CLI wrapper in a shell script."""
|
||||||
script = ctx.actions.declare_file(ctx.label.name)
|
script = ctx.actions.declare_file(ctx.label.name)
|
||||||
|
|
||||||
# Build DATABUILD_CANDIDATE_JOBS_CFG JSON string with runtime rlocation resolution
|
# Build DATABUILD_CANDIDATE_JOBS JSON string with runtime rlocation resolution
|
||||||
candidate_jobs_cfg_script_lines = ["CANDIDATE_JOBS_JSON_CFG=\"{\""]
|
candidate_jobs_script_lines = ["CANDIDATE_JOBS_JSON=\"{\""]
|
||||||
for i, job in enumerate(ctx.attr.jobs):
|
for i, job in enumerate(ctx.attr.jobs):
|
||||||
job_label = "//" + job.label.package + ":" + job.label.name
|
job_label = "//" + job.label.package + ":" + job.label.name
|
||||||
configure_path = job[DataBuildJobInfo].configure.files_to_run.executable.short_path
|
configure_path = job[DataBuildJobInfo].configure.files_to_run.executable.short_path
|
||||||
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
|
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
|
||||||
candidate_jobs_cfg_script_lines.append(
|
candidate_jobs_script_lines.append(
|
||||||
'CANDIDATE_JOBS_JSON_CFG="${CANDIDATE_JOBS_JSON_CFG}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % (
|
'CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % (
|
||||||
job_label,
|
job_label,
|
||||||
configure_path,
|
configure_path,
|
||||||
separator,
|
separator,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
candidate_jobs_cfg_script_lines.append('CANDIDATE_JOBS_JSON_CFG="${CANDIDATE_JOBS_JSON_CFG}}"')
|
candidate_jobs_script_lines.append('CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}}"')
|
||||||
candidate_jobs_cfg_script = "\n".join(candidate_jobs_cfg_script_lines)
|
candidate_jobs_script = "\n".join(candidate_jobs_script_lines)
|
||||||
|
|
||||||
# Build DATABUILD_CANDIDATE_JOBS_EXEC JSON string with runtime rlocation resolution
|
|
||||||
candidate_jobs_exec_script_lines = ["CANDIDATE_JOBS_JSON_EXEC=\"{\""]
|
|
||||||
for i, job in enumerate(ctx.attr.jobs):
|
|
||||||
job_label = "//" + job.label.package + ":" + job.label.name
|
|
||||||
configure_path = job[DataBuildJobInfo].execute.short_path
|
|
||||||
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
|
|
||||||
candidate_jobs_exec_script_lines.append(
|
|
||||||
'CANDIDATE_JOBS_JSON_EXEC="${CANDIDATE_JOBS_JSON_EXEC}\\"%s\\":\\"$(rlocation _main/%s.exec)\\"%s"' % (
|
|
||||||
job_label,
|
|
||||||
configure_path,
|
|
||||||
separator,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
candidate_jobs_exec_script_lines.append('CANDIDATE_JOBS_JSON_EXEC="${CANDIDATE_JOBS_JSON_EXEC}}"')
|
|
||||||
candidate_jobs_exec_script = "\n".join(candidate_jobs_exec_script_lines)
|
|
||||||
|
|
||||||
script_content = RUNFILES_PREFIX + """
|
script_content = RUNFILES_PREFIX + """
|
||||||
# Build DATABUILD_CANDIDATE_JOBS_CFG dynamically with proper rlocation resolution
|
# Build DATABUILD_CANDIDATE_JOBS dynamically with proper rlocation resolution
|
||||||
{candidate_jobs_cfg_script}
|
%s
|
||||||
{candidate_jobs_exec_script}
|
|
||||||
|
|
||||||
export DATABUILD_CANDIDATE_JOBS_CFG="$CANDIDATE_JOBS_JSON_CFG"
|
export DATABUILD_CANDIDATE_JOBS="$CANDIDATE_JOBS_JSON"
|
||||||
export DATABUILD_CANDIDATE_JOBS_EXEC="$CANDIDATE_JOBS_JSON_EXEC"
|
export DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/%s)"
|
||||||
|
export DATABUILD_GRAPH_LABEL="%s"
|
||||||
# Resolve binary paths with error checking
|
|
||||||
DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/{lookup_path})"
|
|
||||||
if [[ -z "$DATABUILD_JOB_LOOKUP_PATH" || ! -f "$DATABUILD_JOB_LOOKUP_PATH" ]]; then
|
|
||||||
echo "ERROR: Could not find job lookup binary at _main/{lookup_path}" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
export DATABUILD_JOB_LOOKUP_PATH
|
|
||||||
|
|
||||||
DATABUILD_ANALYZE_BINARY="$(rlocation _main/{analyze_path})"
|
|
||||||
if [[ -z "$DATABUILD_ANALYZE_BINARY" || ! -f "$DATABUILD_ANALYZE_BINARY" ]]; then
|
|
||||||
echo "ERROR: Could not find analyze binary at _main/{analyze_path}" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
export DATABUILD_ANALYZE_BINARY
|
|
||||||
|
|
||||||
DATABUILD_EXECUTE_BINARY="$(rlocation _main/{execute_path})"
|
|
||||||
if [[ -z "$DATABUILD_EXECUTE_BINARY" || ! -f "$DATABUILD_EXECUTE_BINARY" ]]; then
|
|
||||||
echo "ERROR: Could not find execute binary at _main/{execute_path}" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
export DATABUILD_EXECUTE_BINARY
|
|
||||||
|
|
||||||
export DATABUILD_GRAPH_LABEL="{graph_label}"
|
|
||||||
|
|
||||||
# Generate a single build request ID for the entire CLI operation
|
# Generate a single build request ID for the entire CLI operation
|
||||||
export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())")
|
export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())")
|
||||||
|
|
||||||
# Run unified DataBuild CLI wrapper using the provided cli_wrapper attribute
|
# Run unified DataBuild CLI wrapper
|
||||||
CLI_BINARY="$(rlocation _main/{cli_path})"
|
"$(rlocation databuild+/databuild/cli/databuild_cli)" "$@"
|
||||||
if [[ -z "$CLI_BINARY" || ! -f "$CLI_BINARY" ]]; then
|
""" % (
|
||||||
echo "ERROR: Could not find CLI binary at _main/{cli_path}" >&2
|
candidate_jobs_script,
|
||||||
exit 1
|
ctx.attr.lookup.files_to_run.executable.short_path,
|
||||||
fi
|
ctx.attr.graph_label,
|
||||||
|
|
||||||
"$CLI_BINARY" "$@"
|
|
||||||
""".format(
|
|
||||||
candidate_jobs_cfg_script = candidate_jobs_cfg_script,
|
|
||||||
candidate_jobs_exec_script = candidate_jobs_exec_script,
|
|
||||||
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
|
|
||||||
analyze_path = ctx.attr._analyze.files_to_run.executable.short_path,
|
|
||||||
execute_path = ctx.attr._execute.files_to_run.executable.short_path,
|
|
||||||
graph_label = ctx.attr.graph_label,
|
|
||||||
cli_path = ctx.attr.cli_wrapper.files_to_run.executable.short_path,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx.actions.write(
|
ctx.actions.write(
|
||||||
|
|
@ -725,35 +674,58 @@ fi
|
||||||
content = script_content,
|
content = script_content,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Gather the configure executables
|
# Gather the configure and execute executables
|
||||||
configure_executables = [
|
configure_executables = [
|
||||||
job[DataBuildJobInfo].configure.files_to_run.executable
|
job[DataBuildJobInfo].configure.files_to_run.executable
|
||||||
for job in ctx.attr.jobs
|
for job in ctx.attr.jobs
|
||||||
]
|
]
|
||||||
|
|
||||||
# Gather the exec executables
|
# Get the execute targets - these are the .exec files that need to be in runfiles
|
||||||
exec_executables = [
|
execute_executables = []
|
||||||
job[DataBuildJobInfo].execute
|
for job in ctx.attr.jobs:
|
||||||
for job in ctx.attr.jobs
|
# The job target itself contains references to both configure and execute
|
||||||
]
|
# We need to find the .exec target for each job
|
||||||
|
job_name = job.label.name
|
||||||
|
exec_target_name = job_name + ".exec"
|
||||||
|
|
||||||
|
# Find the .exec target in the same package
|
||||||
|
for attr_name in dir(job):
|
||||||
|
if attr_name.endswith("_exec") or exec_target_name in attr_name:
|
||||||
|
exec_target = getattr(job, attr_name, None)
|
||||||
|
if exec_target and hasattr(exec_target, "files_to_run"):
|
||||||
|
execute_executables.append(exec_target.files_to_run.executable)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Also check if we can access exec targets directly from job dependencies
|
||||||
|
all_job_files = []
|
||||||
|
for job in ctx.attr.jobs:
|
||||||
|
if hasattr(job, "default_runfiles") and job.default_runfiles:
|
||||||
|
all_job_files.extend(job.default_runfiles.files.to_list())
|
||||||
|
|
||||||
# Create runfiles including the CLI binary, analyze/execute binaries and all dependencies
|
|
||||||
runfiles = ctx.runfiles(
|
runfiles = ctx.runfiles(
|
||||||
files = [ctx.executable.cli_wrapper, ctx.executable.lookup, ctx.executable._analyze, ctx.executable._execute] + configure_executables + exec_executables,
|
files = [ctx.executable.cli_wrapper, ctx.executable.lookup] + configure_executables + execute_executables + all_job_files,
|
||||||
).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge(
|
).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge(
|
||||||
ctx.attr._analyze.default_runfiles,
|
ctx.attr._bash_runfiles.default_runfiles,
|
||||||
).merge(ctx.attr._execute.default_runfiles).merge(ctx.attr._bash_runfiles.default_runfiles).merge_all([job.default_runfiles for job in ctx.attr.jobs])
|
)
|
||||||
|
|
||||||
# Merge runfiles from all configure targets
|
# Merge runfiles from all configure targets and job targets
|
||||||
for job in ctx.attr.jobs:
|
for job in ctx.attr.jobs:
|
||||||
configure_target = job[DataBuildJobInfo].configure
|
configure_target = job[DataBuildJobInfo].configure
|
||||||
runfiles = runfiles.merge(configure_target.default_runfiles)
|
runfiles = runfiles.merge(configure_target.default_runfiles)
|
||||||
|
|
||||||
|
# Also merge the job's own runfiles which should include the .exec target
|
||||||
|
runfiles = runfiles.merge(job.default_runfiles)
|
||||||
|
|
||||||
return [
|
return [
|
||||||
DefaultInfo(
|
DefaultInfo(
|
||||||
executable = script,
|
executable = script,
|
||||||
runfiles = runfiles,
|
runfiles = runfiles,
|
||||||
),
|
),
|
||||||
|
DataBuildGraphInfo(
|
||||||
|
analyze = ctx.attr.cli_wrapper,
|
||||||
|
exec = ctx.attr.cli_wrapper,
|
||||||
|
jobs = ctx.attr.jobs,
|
||||||
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
_databuild_graph_build = rule(
|
_databuild_graph_build = rule(
|
||||||
|
|
@ -783,16 +755,6 @@ _databuild_graph_build = rule(
|
||||||
default = Label("@bazel_tools//tools/bash/runfiles"),
|
default = Label("@bazel_tools//tools/bash/runfiles"),
|
||||||
allow_files = True,
|
allow_files = True,
|
||||||
),
|
),
|
||||||
"_analyze": attr.label(
|
|
||||||
default = "@databuild//databuild/graph:analyze",
|
|
||||||
executable = True,
|
|
||||||
cfg = "target",
|
|
||||||
),
|
|
||||||
"_execute": attr.label(
|
|
||||||
default = "@databuild//databuild/graph:execute",
|
|
||||||
executable = True,
|
|
||||||
cfg = "target",
|
|
||||||
),
|
|
||||||
},
|
},
|
||||||
executable = True,
|
executable = True,
|
||||||
)
|
)
|
||||||
|
|
@ -801,7 +763,7 @@ def _databuild_graph_service_impl(ctx):
|
||||||
"""Implementation of the service target that runs the Build Graph Service."""
|
"""Implementation of the service target that runs the Build Graph Service."""
|
||||||
script = ctx.actions.declare_file(ctx.label.name)
|
script = ctx.actions.declare_file(ctx.label.name)
|
||||||
|
|
||||||
# Build job configurations mapping for DATABUILD_CANDIDATE_JOBS_CFG
|
# Build job configurations mapping for DATABUILD_CANDIDATE_JOBS
|
||||||
config_paths = {
|
config_paths = {
|
||||||
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
|
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
|
||||||
for job in ctx.attr.jobs
|
for job in ctx.attr.jobs
|
||||||
|
|
@ -813,21 +775,21 @@ def _databuild_graph_service_impl(ctx):
|
||||||
default_db = "sqlite:///tmp/%s.db" % ctx.label.name.replace(".", "_")
|
default_db = "sqlite:///tmp/%s.db" % ctx.label.name.replace(".", "_")
|
||||||
|
|
||||||
env_setup = """
|
env_setup = """
|
||||||
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_jobs}"
|
export DATABUILD_CANDIDATE_JOBS="{candidate_jobs}"
|
||||||
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
||||||
export DATABUILD_ANALYZE_BINARY=$(rlocation _main/{analyze_path})
|
export DATABUILD_ANALYZE_BINARY=$(rlocation _main/{analyze_path})
|
||||||
export DATABUILD_EXECUTE_BINARY=$(rlocation _main/{exec_path})
|
export DATABUILD_EXECUTE_BINARY=$(rlocation _main/{exec_path})
|
||||||
export DATABUILD_SERVICE_BINARY=$(rlocation _main/{service_path})
|
|
||||||
""".format(
|
""".format(
|
||||||
candidate_jobs = config_paths_str,
|
candidate_jobs = config_paths_str,
|
||||||
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
|
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
|
||||||
analyze_path = ctx.attr.analyze.files_to_run.executable.short_path,
|
analyze_path = ctx.attr.analyze.files_to_run.executable.short_path,
|
||||||
exec_path = ctx.attr.exec.files_to_run.executable.short_path,
|
exec_path = ctx.attr.exec.files_to_run.executable.short_path,
|
||||||
service_path = ctx.attr._service.files_to_run.executable.short_path,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Generate a custom script instead of using the template to handle the external binary correctly
|
# Generate a custom script instead of using the template to handle the external binary correctly
|
||||||
script_content = RUNFILES_PREFIX + env_setup + """
|
script_content = RUNFILES_PREFIX + env_setup + """
|
||||||
|
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/build_graph_service")"
|
||||||
|
|
||||||
# Always pass graph-specific configuration, allow user args to override defaults like port/host
|
# Always pass graph-specific configuration, allow user args to override defaults like port/host
|
||||||
# Graph-specific args that should always be set:
|
# Graph-specific args that should always be set:
|
||||||
GRAPH_ARGS=(
|
GRAPH_ARGS=(
|
||||||
|
|
@ -848,9 +810,9 @@ fi
|
||||||
|
|
||||||
# Run the service with graph-specific args + user args
|
# Run the service with graph-specific args + user args
|
||||||
if [[ -n "${{EXECUTABLE_SUBCOMMAND:-}}" ]]; then
|
if [[ -n "${{EXECUTABLE_SUBCOMMAND:-}}" ]]; then
|
||||||
exec "${{DATABUILD_SERVICE_BINARY}}" "${{EXECUTABLE_SUBCOMMAND}}" "${{GRAPH_ARGS[@]}}" "$@"
|
exec "${{EXECUTABLE_BINARY}}" "${{EXECUTABLE_SUBCOMMAND}}" "${{GRAPH_ARGS[@]}}" "$@"
|
||||||
else
|
else
|
||||||
exec "${{DATABUILD_SERVICE_BINARY}}" "${{GRAPH_ARGS[@]}}" "$@"
|
exec "${{EXECUTABLE_BINARY}}" "${{GRAPH_ARGS[@]}}" "$@"
|
||||||
fi
|
fi
|
||||||
""".format(
|
""".format(
|
||||||
graph_label = ctx.attr.graph_label,
|
graph_label = ctx.attr.graph_label,
|
||||||
|
|
@ -940,186 +902,3 @@ _databuild_graph_service = rule(
|
||||||
},
|
},
|
||||||
executable = True,
|
executable = True,
|
||||||
)
|
)
|
||||||
|
|
||||||
def databuild_dsl_generator(
|
|
||||||
name,
|
|
||||||
graph_file,
|
|
||||||
graph_attr = "graph",
|
|
||||||
output_package = None,
|
|
||||||
deps = [],
|
|
||||||
visibility = None):
|
|
||||||
"""Creates a DataBuild DSL code generator that can generate BUILD.bazel and job binaries.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: Name of the generator target (typically ends with .generate)
|
|
||||||
graph_file: Python file containing the DSL graph definition
|
|
||||||
graph_attr: Name of the graph attribute in the module (default: "graph")
|
|
||||||
output_package: Target package for generated files (default: current package)
|
|
||||||
deps: Dependencies needed to load the graph
|
|
||||||
visibility: Visibility specification
|
|
||||||
"""
|
|
||||||
if not output_package:
|
|
||||||
output_package = "//" + native.package_name()
|
|
||||||
|
|
||||||
_databuild_dsl_generator_rule(
|
|
||||||
name = name,
|
|
||||||
graph_file = graph_file,
|
|
||||||
graph_attr = graph_attr,
|
|
||||||
output_package = output_package,
|
|
||||||
deps = deps,
|
|
||||||
visibility = visibility,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _databuild_dsl_generator_impl(ctx):
|
|
||||||
"""Implementation of the DSL generator rule."""
|
|
||||||
script = ctx.actions.declare_file(ctx.label.name + "_generator.py")
|
|
||||||
|
|
||||||
# Get the module path from the graph file
|
|
||||||
graph_file_path = ctx.file.graph_file.short_path
|
|
||||||
if graph_file_path.endswith(".py"):
|
|
||||||
graph_file_path = graph_file_path[:-3]
|
|
||||||
module_path = graph_file_path.replace("/", ".")
|
|
||||||
|
|
||||||
# Get the package path for output
|
|
||||||
package_path = ctx.attr.output_package.strip("//").replace(":", "/")
|
|
||||||
|
|
||||||
script_content = """#!/usr/bin/env python3
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
# Determine output directory
|
|
||||||
workspace_root = os.environ.get('BUILD_WORKSPACE_DIRECTORY')
|
|
||||||
if workspace_root:
|
|
||||||
# Running with bazel run - write to source tree
|
|
||||||
output_dir = os.path.join(workspace_root, '{package_path}')
|
|
||||||
else:
|
|
||||||
# Running with bazel build - write to current directory (bazel-bin)
|
|
||||||
output_dir = '.'
|
|
||||||
|
|
||||||
print(f"Generating DataBuild DSL code to {{output_dir}}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Setup runfiles for module resolution
|
|
||||||
try:
|
|
||||||
from python.runfiles import runfiles
|
|
||||||
r = runfiles.Create()
|
|
||||||
if r:
|
|
||||||
generator_path = r.Rlocation("databuild/databuild/dsl/python/generator")
|
|
||||||
else:
|
|
||||||
raise ImportError("runfiles not available")
|
|
||||||
except ImportError:
|
|
||||||
# Fallback - assume generator is in the same directory as this script
|
|
||||||
import subprocess
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
# Try to find the generator binary using bazel
|
|
||||||
generator_path = shutil.which("bazel")
|
|
||||||
if generator_path:
|
|
||||||
import subprocess
|
|
||||||
result = subprocess.run([
|
|
||||||
"bazel", "run", "//databuild/dsl/python:generator", "--",
|
|
||||||
"{module_path}", "{graph_attr}", output_dir
|
|
||||||
], cwd=workspace_root if workspace_root else ".", capture_output=True, text=True)
|
|
||||||
|
|
||||||
if result.returncode != 0:
|
|
||||||
print(f"ERROR: Generation failed:", file=sys.stderr)
|
|
||||||
print(result.stderr, file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
print(result.stdout)
|
|
||||||
sys.exit(0)
|
|
||||||
else:
|
|
||||||
print("ERROR: Could not find bazel to run generator", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Run the generator with proper module resolution
|
|
||||||
import subprocess
|
|
||||||
result = subprocess.run([
|
|
||||||
generator_path, "{module_path}", "{graph_attr}", output_dir
|
|
||||||
], capture_output=True, text=True)
|
|
||||||
|
|
||||||
if result.returncode != 0:
|
|
||||||
print(f"ERROR: Generation failed:", file=sys.stderr)
|
|
||||||
print(result.stderr, file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
print(result.stdout)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Generation failed: {{e}}", file=sys.stderr)
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc()
|
|
||||||
sys.exit(1)
|
|
||||||
""".format(
|
|
||||||
package_path = package_path,
|
|
||||||
module_path = module_path,
|
|
||||||
graph_attr = ctx.attr.graph_attr,
|
|
||||||
name = ctx.attr.name.replace(".generate", ""),
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.actions.write(
|
|
||||||
output = script,
|
|
||||||
content = script_content,
|
|
||||||
is_executable = True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create runfiles with all dependencies
|
|
||||||
runfiles = ctx.runfiles(
|
|
||||||
files = [ctx.file.graph_file, ctx.executable._generator],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Merge runfiles from all dependencies
|
|
||||||
for dep in ctx.attr.deps:
|
|
||||||
if hasattr(dep, "default_runfiles"):
|
|
||||||
runfiles = runfiles.merge(dep.default_runfiles)
|
|
||||||
|
|
||||||
# Add protobuf dependencies
|
|
||||||
if hasattr(ctx.attr._py_proto, "default_runfiles"):
|
|
||||||
runfiles = runfiles.merge(ctx.attr._py_proto.default_runfiles)
|
|
||||||
|
|
||||||
# Add generator runfiles
|
|
||||||
runfiles = runfiles.merge(ctx.attr._generator.default_runfiles)
|
|
||||||
|
|
||||||
# Also add Python runfiles for proper module resolution
|
|
||||||
runfiles = runfiles.merge(ctx.attr._python_runfiles.default_runfiles)
|
|
||||||
|
|
||||||
return [DefaultInfo(
|
|
||||||
executable = script,
|
|
||||||
runfiles = runfiles,
|
|
||||||
)]
|
|
||||||
|
|
||||||
_databuild_dsl_generator_rule = rule(
|
|
||||||
implementation = _databuild_dsl_generator_impl,
|
|
||||||
attrs = {
|
|
||||||
"graph_file": attr.label(
|
|
||||||
doc = "Python file containing the DSL graph definition",
|
|
||||||
allow_single_file = [".py"],
|
|
||||||
mandatory = True,
|
|
||||||
),
|
|
||||||
"graph_attr": attr.string(
|
|
||||||
doc = "Name of the graph attribute in the module",
|
|
||||||
default = "graph",
|
|
||||||
),
|
|
||||||
"output_package": attr.string(
|
|
||||||
doc = "Target package for generated files",
|
|
||||||
mandatory = True,
|
|
||||||
),
|
|
||||||
"deps": attr.label_list(
|
|
||||||
doc = "Dependencies needed to load the graph",
|
|
||||||
allow_empty = True,
|
|
||||||
),
|
|
||||||
"_python_runfiles": attr.label(
|
|
||||||
default = "@rules_python//python/runfiles",
|
|
||||||
allow_files = True,
|
|
||||||
),
|
|
||||||
"_py_proto": attr.label(
|
|
||||||
default = "//databuild:py_proto",
|
|
||||||
),
|
|
||||||
"_generator": attr.label(
|
|
||||||
default = "//databuild/dsl/python:generator",
|
|
||||||
executable = True,
|
|
||||||
cfg = "target",
|
|
||||||
),
|
|
||||||
},
|
|
||||||
executable = True,
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -305,27 +305,15 @@ impl BuildGraphService {
|
||||||
.layer(Extension(api))
|
.layer(Extension(api))
|
||||||
.layer(axum::middleware::from_fn(Self::cors_middleware))
|
.layer(axum::middleware::from_fn(Self::cors_middleware))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn openapi_spec(Extension(api): Extension<OpenApi>) -> Json<OpenApi> {
|
pub async fn openapi_spec(Extension(api): Extension<OpenApi>) -> Json<OpenApi> {
|
||||||
Json(api)
|
Json(api)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn resolve_fpath(fpath: &str) -> String {
|
|
||||||
let standard_prefix = "databuild+";
|
|
||||||
let test_prefix = "_main";
|
|
||||||
|
|
||||||
match (
|
|
||||||
std::fs::read_dir(Self::get_runfile_path(&format!("{}/databuild/dashboard", standard_prefix))),
|
|
||||||
std::fs::read_dir(Self::get_runfile_path(&format!("{}/databuild/dashboard", test_prefix))),
|
|
||||||
) {
|
|
||||||
(Ok(_), _) => Self::get_runfile_path(&format!("{}/databuild/dashboard/{}", standard_prefix, fpath)),
|
|
||||||
(Err(_), Ok(_)) => Self::get_runfile_path(&format!("{}/databuild/dashboard/{}", test_prefix, fpath)),
|
|
||||||
(_, Err(_)) => panic!("Failed to find dashboard files"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn serve_index() -> Response {
|
pub async fn serve_index() -> Response {
|
||||||
match std::fs::read_to_string(&Self::resolve_fpath("index.html")) {
|
let index_path = Self::get_runfile_path("databuild+/databuild/dashboard/index.html");
|
||||||
|
|
||||||
|
match std::fs::read_to_string(&index_path) {
|
||||||
Ok(content) => Response::builder()
|
Ok(content) => Response::builder()
|
||||||
.header("content-type", "text/html")
|
.header("content-type", "text/html")
|
||||||
.body(content.into())
|
.body(content.into())
|
||||||
|
|
@ -338,7 +326,9 @@ impl BuildGraphService {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve_static(axum::extract::Path(file): axum::extract::Path<String>) -> Response {
|
pub async fn serve_static(axum::extract::Path(file): axum::extract::Path<String>) -> Response {
|
||||||
match std::fs::read(&Self::resolve_fpath(&file)) {
|
let file_path = Self::get_runfile_path(&format!("databuild+/databuild/dashboard/{}", file));
|
||||||
|
|
||||||
|
match std::fs::read(file_path) {
|
||||||
Ok(content) => {
|
Ok(content) => {
|
||||||
let content_type = match file.split('.').last() {
|
let content_type = match file.split('.').last() {
|
||||||
Some("html") => "text/html",
|
Some("html") => "text/html",
|
||||||
|
|
@ -362,11 +352,6 @@ impl BuildGraphService {
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_dashboard_file_path(relative_path: &str) -> String {
|
|
||||||
let runfiles_dir = std::env::var("DASHBOARD_FILES_DIR").unwrap();
|
|
||||||
format!("{}/{}", runfiles_dir, relative_path)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_runfile_path(relative_path: &str) -> String {
|
fn get_runfile_path(relative_path: &str) -> String {
|
||||||
if let Ok(runfiles_dir) = std::env::var("RUNFILES_DIR") {
|
if let Ok(runfiles_dir) = std::env::var("RUNFILES_DIR") {
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ from pathlib import Path
|
||||||
|
|
||||||
def ref_path(ref: PartitionRef) -> str:
|
def ref_path(ref: PartitionRef) -> str:
|
||||||
assert isinstance(ref, PartitionRef), f"Wanted PartitionRef, got `{type(ref)}`"
|
assert isinstance(ref, PartitionRef), f"Wanted PartitionRef, got `{type(ref)}`"
|
||||||
return "/tmp/data/" + ref.str.lstrip("/") + "/data.json"
|
return "data/" + ref.str.lstrip("/") + "/data.json"
|
||||||
|
|
||||||
|
|
||||||
def read(*refs: PartitionRef, empty_ok: bool=True) -> list[dict]:
|
def read(*refs: PartitionRef, empty_ok: bool=True) -> list[dict]:
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,3 @@
|
||||||
load("@databuild//databuild:rules.bzl", "databuild_dsl_generator")
|
|
||||||
|
|
||||||
py_library(
|
py_library(
|
||||||
name = "dsl_src",
|
name = "dsl_src",
|
||||||
srcs = glob(
|
srcs = glob(
|
||||||
|
|
@ -13,12 +11,3 @@ py_library(
|
||||||
"//databuild/test/app:job_src",
|
"//databuild/test/app:job_src",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
databuild_dsl_generator(
|
|
||||||
name = "graph.generate",
|
|
||||||
graph_file = "graph.py",
|
|
||||||
graph_attr = "graph",
|
|
||||||
output_package = "//databuild/test/app/dsl",
|
|
||||||
deps = [":dsl_src"],
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -1,47 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Generated job lookup for DataBuild DSL graph.
|
|
||||||
Maps partition patterns to job targets.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import re
|
|
||||||
|
|
||||||
|
|
||||||
# Mapping from partition patterns to job targets
|
|
||||||
JOB_MAPPINGS = {
|
|
||||||
r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":ingest_color_votes",
|
|
||||||
r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":trailing_color_votes",
|
|
||||||
r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":trailing_color_votes",
|
|
||||||
r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
|
|
||||||
r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
|
|
||||||
r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
|
|
||||||
r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":color_vote_report_calc",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def lookup_job_for_partition(partition_ref: str) -> str:
|
|
||||||
"""Look up which job can build the given partition reference."""
|
|
||||||
for pattern, job_target in JOB_MAPPINGS.items():
|
|
||||||
if re.match(pattern, partition_ref):
|
|
||||||
return job_target
|
|
||||||
|
|
||||||
raise ValueError(f"No job found for partition: {partition_ref}")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) != 2:
|
|
||||||
print("Usage: job_lookup.py <partition_ref>", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
partition_ref = sys.argv[1]
|
|
||||||
try:
|
|
||||||
job_target = lookup_job_for_partition(partition_ref)
|
|
||||||
print(job_target)
|
|
||||||
except ValueError as e:
|
|
||||||
print(f"ERROR: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,118 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph.
|
|
||||||
Configured via environment variables:
|
|
||||||
- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph')
|
|
||||||
- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes')
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import importlib
|
|
||||||
from typing import List, Any
|
|
||||||
from databuild.proto import JobConfig
|
|
||||||
|
|
||||||
|
|
||||||
def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]:
|
|
||||||
"""Parse partition output references from command line arguments into partition objects."""
|
|
||||||
outputs = []
|
|
||||||
for arg in args:
|
|
||||||
# Find which output type can deserialize this partition reference
|
|
||||||
for output_type in job_class.output_types:
|
|
||||||
try:
|
|
||||||
partition = output_type.deserialize(arg)
|
|
||||||
outputs.append(partition)
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}")
|
|
||||||
|
|
||||||
return outputs
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) < 2:
|
|
||||||
print("Usage: dsl_job_wrapper.py <config|exec> [args...]", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
command = sys.argv[1]
|
|
||||||
|
|
||||||
# Read configuration from environment
|
|
||||||
graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE')
|
|
||||||
job_class_name = os.environ.get('DATABUILD_JOB_CLASS')
|
|
||||||
|
|
||||||
if not graph_module_path:
|
|
||||||
print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not job_class_name:
|
|
||||||
print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Import the graph module
|
|
||||||
module = importlib.import_module(graph_module_path)
|
|
||||||
graph = getattr(module, 'graph')
|
|
||||||
|
|
||||||
# Get the job class
|
|
||||||
job_class = getattr(module, job_class_name)
|
|
||||||
|
|
||||||
# Create job instance
|
|
||||||
job_instance = job_class()
|
|
||||||
|
|
||||||
except (ImportError, AttributeError) as e:
|
|
||||||
print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if command == "config":
|
|
||||||
try:
|
|
||||||
# Parse output partition references from remaining args
|
|
||||||
output_refs = sys.argv[2:]
|
|
||||||
if not output_refs:
|
|
||||||
print("ERROR: No output partition references provided", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
outputs = parse_outputs_from_args(output_refs, job_class)
|
|
||||||
|
|
||||||
# Call job's config method
|
|
||||||
configs = job_instance.config(outputs)
|
|
||||||
|
|
||||||
# Output each config as JSON (one per line for multiple configs)
|
|
||||||
for config in configs:
|
|
||||||
# Convert JobConfig to dict for JSON serialization
|
|
||||||
config_dict = {
|
|
||||||
'outputs': [{'str': ref.str} for ref in config.outputs],
|
|
||||||
'inputs': [
|
|
||||||
{
|
|
||||||
'dep_type_code': dep.dep_type_code,
|
|
||||||
'dep_type_name': dep.dep_type_name,
|
|
||||||
'partition_ref': {'str': dep.partition_ref.str}
|
|
||||||
} for dep in config.inputs
|
|
||||||
],
|
|
||||||
'args': config.args,
|
|
||||||
'env': config.env,
|
|
||||||
}
|
|
||||||
print(json.dumps(config_dict))
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Config failed: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
elif command == "exec":
|
|
||||||
try:
|
|
||||||
# Call job's exec method
|
|
||||||
job_instance.exec(*sys.argv[2:])
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"ERROR: Execution failed: {e}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -17,10 +17,9 @@ from databuild.test.app.dsl.partitions import (
|
||||||
Votes1MPartition,
|
Votes1MPartition,
|
||||||
ColorVoteReportPartition
|
ColorVoteReportPartition
|
||||||
)
|
)
|
||||||
import os
|
|
||||||
from datetime import date, timedelta
|
from datetime import date, timedelta
|
||||||
|
|
||||||
graph = DataBuildGraph("//databuild/test/app/dsl:dsl_graph")
|
graph = DataBuildGraph("//databuild/test/app:dsl_graph")
|
||||||
|
|
||||||
|
|
||||||
@graph.job
|
@graph.job
|
||||||
|
|
@ -34,8 +33,8 @@ class IngestColorVotes(DataBuildJob):
|
||||||
configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build())
|
configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build())
|
||||||
return configs
|
return configs
|
||||||
|
|
||||||
def exec(self, *args: str) -> None:
|
def exec(self, config: JobConfig) -> None:
|
||||||
ingest_color_votes_exec(data_date=os.environ["DATA_DATE"], color=os.environ["COLOR"])
|
ingest_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
|
||||||
|
|
||||||
|
|
||||||
@graph.job
|
@graph.job
|
||||||
|
|
@ -69,8 +68,8 @@ class TrailingColorVotes(DataBuildJob):
|
||||||
configs.append(config.build())
|
configs.append(config.build())
|
||||||
return configs
|
return configs
|
||||||
|
|
||||||
def exec(self, *args: str) -> None:
|
def exec(self, config: JobConfig) -> None:
|
||||||
trailing_color_votes_exec(data_date=os.environ["DATA_DATE"], color=os.environ["COLOR"])
|
trailing_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
|
||||||
|
|
||||||
|
|
||||||
@graph.job
|
@graph.job
|
||||||
|
|
@ -99,8 +98,8 @@ class AggregateColorVotes(DataBuildJob):
|
||||||
|
|
||||||
return configs
|
return configs
|
||||||
|
|
||||||
def exec(self, *args: str) -> None:
|
def exec(self, config: JobConfig) -> None:
|
||||||
aggregate_color_votes_exec(data_date=os.environ["DATA_DATE"], aggregate_type=os.environ["AGGREGATE_TYPE"])
|
aggregate_color_votes_exec(data_date=config.env["DATA_DATE"], aggregate_type=config.env["AGGREGATE_TYPE"])
|
||||||
|
|
||||||
|
|
||||||
@graph.job
|
@graph.job
|
||||||
|
|
@ -126,6 +125,6 @@ class ColorVoteReportCalc(DataBuildJob):
|
||||||
|
|
||||||
return [config.build()]
|
return [config.build()]
|
||||||
|
|
||||||
def exec(self, *args: str) -> None:
|
def exec(self, config: JobConfig) -> None:
|
||||||
color_vote_report_calc_exec(list(args))
|
color_vote_report_calc_exec(config.args)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -141,181 +141,7 @@ bazel run @test_graph//:job_lookup -- \
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### Phase 3: Two-Phase Code Generation
|
### Phase 3: Graph Integration
|
||||||
**Goal**: Implement proper two-phase code generation that works within Bazel's constraints
|
|
||||||
|
|
||||||
#### Key Learning
|
|
||||||
Previous attempts failed due to fundamental Bazel constraints:
|
|
||||||
- **Loading vs Execution phases**: `load()` statements run before genrules execute
|
|
||||||
- **Dynamic target generation**: Bazel requires the complete build graph before execution begins
|
|
||||||
- **Hermeticity**: Generated BUILD files must be in source tree, not bazel-bin
|
|
||||||
|
|
||||||
The solution: **Two-phase generation** following established patterns from protobuf, thrift, and other code generators.
|
|
||||||
|
|
||||||
#### Two-Phase Workflow
|
|
||||||
|
|
||||||
**Phase 1: Code Generation** (run by developer)
|
|
||||||
```bash
|
|
||||||
bazel run //databuild/test/app/dsl:graph.generate
|
|
||||||
# Generates BUILD.bazel and Python binaries into source tree
|
|
||||||
```
|
|
||||||
|
|
||||||
**Phase 2: Building** (normal Bazel workflow)
|
|
||||||
```bash
|
|
||||||
bazel build //databuild/test/app/dsl:graph.analyze
|
|
||||||
bazel run //databuild/test/app/dsl:graph.service -- --port 8080
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Implementation Tasks
|
|
||||||
|
|
||||||
1. **Create `databuild_dsl_generator` rule**:
|
|
||||||
```python
|
|
||||||
databuild_dsl_generator(
|
|
||||||
name = "graph.generate",
|
|
||||||
graph_file = "graph.py",
|
|
||||||
output_package = "//databuild/test/app/dsl",
|
|
||||||
deps = [":dsl_src"],
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
2. **Implement generator that writes to source tree**:
|
|
||||||
```python
|
|
||||||
def _databuild_dsl_generator_impl(ctx):
|
|
||||||
script = ctx.actions.declare_file(ctx.label.name + "_generator.py")
|
|
||||||
|
|
||||||
# Create a script that:
|
|
||||||
# 1. Loads the DSL graph
|
|
||||||
# 2. Generates BUILD.bazel and binaries
|
|
||||||
# 3. Writes them to the source tree
|
|
||||||
script_content = """
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
# Add workspace root to path
|
|
||||||
workspace_root = os.environ.get('BUILD_WORKSPACE_DIRECTORY')
|
|
||||||
output_dir = os.path.join(workspace_root, '{package_path}')
|
|
||||||
|
|
||||||
# Load and generate
|
|
||||||
from {module_path} import {graph_attr}
|
|
||||||
{graph_attr}.generate_bazel_package('{name}', output_dir)
|
|
||||||
print(f'Generated BUILD.bazel and binaries in {{output_dir}}')
|
|
||||||
""".format(
|
|
||||||
package_path = ctx.attr.output_package.strip("//").replace(":", "/"),
|
|
||||||
module_path = ctx.file.graph_file.path.replace("/", ".").replace(".py", ""),
|
|
||||||
graph_attr = ctx.attr.graph_attr,
|
|
||||||
name = ctx.attr.name.replace(".generate", ""),
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.actions.write(
|
|
||||||
output = script,
|
|
||||||
content = script_content,
|
|
||||||
is_executable = True,
|
|
||||||
)
|
|
||||||
|
|
||||||
return [DefaultInfo(executable = script)]
|
|
||||||
```
|
|
||||||
|
|
||||||
3. **Update `DataBuildGraph.generate_bazel_package()` to target source tree**:
|
|
||||||
```python
|
|
||||||
def generate_bazel_package(self, name: str, output_dir: str) -> None:
|
|
||||||
"""Generate BUILD.bazel and binaries into source directory"""
|
|
||||||
# Generate BUILD.bazel with real databuild targets
|
|
||||||
self._generate_build_bazel(output_dir, name)
|
|
||||||
|
|
||||||
# Generate job binaries
|
|
||||||
self._generate_job_binaries(output_dir)
|
|
||||||
|
|
||||||
# Generate job lookup
|
|
||||||
self._generate_job_lookup(output_dir)
|
|
||||||
|
|
||||||
print(f"Generated package in {output_dir}")
|
|
||||||
print("Run 'bazel build :{name}.analyze' to use")
|
|
||||||
```
|
|
||||||
|
|
||||||
4. **Create standard BUILD.bazel template**:
|
|
||||||
```python
|
|
||||||
def _generate_build_bazel(self, output_dir: str, name: str):
|
|
||||||
# Generate proper databuild_job and databuild_graph targets
|
|
||||||
# that will work exactly like hand-written ones
|
|
||||||
build_content = self._build_template.format(
|
|
||||||
jobs = self._format_jobs(),
|
|
||||||
graph_name = f"{name}_graph",
|
|
||||||
job_targets = self._format_job_targets(),
|
|
||||||
)
|
|
||||||
|
|
||||||
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
|
|
||||||
f.write(build_content)
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Interface Design
|
|
||||||
|
|
||||||
**For DSL Authors**:
|
|
||||||
```python
|
|
||||||
# In graph.py
|
|
||||||
graph = DataBuildGraph("my_graph")
|
|
||||||
|
|
||||||
@graph.job
|
|
||||||
class MyJob(DataBuildJob):
|
|
||||||
# ... job definition
|
|
||||||
```
|
|
||||||
|
|
||||||
**For Users**:
|
|
||||||
```bash
|
|
||||||
# Generate code (phase 1)
|
|
||||||
bazel run //my/app:graph.generate
|
|
||||||
|
|
||||||
# Use generated code (phase 2)
|
|
||||||
bazel build //my/app:graph.analyze
|
|
||||||
bazel run //my/app:graph.service
|
|
||||||
```
|
|
||||||
|
|
||||||
**In BUILD.bazel**:
|
|
||||||
```python
|
|
||||||
databuild_dsl_generator(
|
|
||||||
name = "graph.generate",
|
|
||||||
graph_file = "graph.py",
|
|
||||||
output_package = "//my/app",
|
|
||||||
deps = [":my_deps"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# After generation, this file will contain:
|
|
||||||
# databuild_graph(name = "graph_graph", ...)
|
|
||||||
# databuild_job(name = "my_job", ...)
|
|
||||||
# py_binary(name = "my_job_binary", ...)
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Benefits of This Approach
|
|
||||||
|
|
||||||
✅ **Works within Bazel constraints** - No dynamic target generation
|
|
||||||
✅ **Follows established patterns** - Same as protobuf, thrift, OpenAPI generators
|
|
||||||
✅ **Inspectable output** - Users can see generated BUILD.bazel
|
|
||||||
✅ **Version controllable** - Generated files can be checked in if desired
|
|
||||||
✅ **Incremental builds** - Standard Bazel caching works perfectly
|
|
||||||
✅ **Clean separation** - Generation vs building are separate phases
|
|
||||||
|
|
||||||
#### Tests & Verification
|
|
||||||
```bash
|
|
||||||
# Test: Code generation
|
|
||||||
bazel run //databuild/test/app/dsl:graph.generate
|
|
||||||
# Should create BUILD.bazel and Python files in source tree
|
|
||||||
|
|
||||||
# Test: Generated targets work
|
|
||||||
bazel build //databuild/test/app/dsl:graph_graph.analyze
|
|
||||||
# Should build successfully using generated BUILD.bazel
|
|
||||||
|
|
||||||
# Test: End-to-end functionality
|
|
||||||
bazel run //databuild/test/app/dsl:graph_graph.analyze -- "color_vote_report/2024-01-01/red"
|
|
||||||
# Should work exactly like hand-written graph
|
|
||||||
```
|
|
||||||
|
|
||||||
#### Success Criteria
|
|
||||||
- Generator creates valid BUILD.bazel in source tree
|
|
||||||
- Generated targets are indistinguishable from hand-written ones
|
|
||||||
- Full DataBuild functionality works through generated code
|
|
||||||
- Clean developer workflow with clear phase separation
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Phase 4: Graph Integration
|
|
||||||
**Goal**: Generate complete databuild graph targets with all operational variants
|
**Goal**: Generate complete databuild graph targets with all operational variants
|
||||||
|
|
||||||
#### Deliverables
|
#### Deliverables
|
||||||
Loading…
Reference in a new issue