This commit is contained in:
Stuart Axelbrooke 2025-08-04 02:48:29 -07:00
parent 492c30c0bc
commit e8f38399fa
13 changed files with 949 additions and 106 deletions

3
.gitignore vendored
View file

@ -16,3 +16,6 @@ generated_number
target
logs/databuild/
**/logs/databuild/
# DSL generated code
**/generated/

View file

@ -34,13 +34,8 @@ async fn run_analysis(
.map_err(|_| CliError::Environment("DATABUILD_GRAPH_LABEL not set".to_string()))?;
// Find analyze binary using runfiles
let analyze_path = env::var("RUNFILES_DIR")
.map(|runfiles_dir| format!("{}/databuild+/databuild/graph/analyze", runfiles_dir))
.or_else(|_| {
// Fallback for direct execution
Ok("./databuild/graph/analyze".to_string())
})
.map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate analyze binary: {}", e)))?;
let analyze_path = env::var("DATABUILD_ANALYZE_BINARY")
.map_err(|_| CliError::Environment("DATABUILD_ANALYZE_BINARY not set".to_string()))?;
// Build analyze command
let cmd = Command::new(analyze_path)
@ -88,13 +83,8 @@ async fn run_execution(
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").unwrap_or_else(|_| "stdout".to_string());
// Find execute binary using runfiles
let execute_path = env::var("RUNFILES_DIR")
.map(|runfiles_dir| format!("{}/databuild+/databuild/graph/execute", runfiles_dir))
.or_else(|_| {
// Fallback for direct execution
Ok("./databuild/graph/execute".to_string())
})
.map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate execute binary: {}", e)))?;
let execute_path = env::var("DATABUILD_EXECUTE_BINARY")
.map_err(|_| CliError::Environment("DATABUILD_EXECUTE_BINARY not set".to_string()))?;
// Build execute command
let mut cmd = Command::new(execute_path)

View file

@ -6,3 +6,17 @@ py_library(
"//databuild:py_proto",
],
)
py_binary(
name = "generator",
srcs = ["generator.py"],
main = "generator.py",
data = ["dsl_job_wrapper.py"],
visibility = ["//visibility:public"],
deps = [
":dsl",
"//databuild:py_proto",
# Include test app modules for testing
"//databuild/test/app/dsl:dsl_src",
],
)

View file

@ -84,6 +84,291 @@ class DataBuildGraph:
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
raise NotImplementedError
def generate_bazel_package(self, name: str, output_dir: str) -> None:
"""Generate BUILD.bazel and binaries into a generated/ subdirectory.
Args:
name: Base name for the generated graph (without .generate suffix)
output_dir: Directory to write generated files to (will create generated/ subdir)
"""
import os
import shutil
# Create generated/ subdirectory
generated_dir = os.path.join(output_dir, "generated")
os.makedirs(generated_dir, exist_ok=True)
# Generate BUILD.bazel with job and graph targets
self._generate_build_bazel(generated_dir, name)
# Generate individual job scripts (instead of shared wrapper)
self._generate_job_scripts(generated_dir)
# Generate job lookup binary
self._generate_job_lookup(generated_dir, name)
package_name = self._get_package_name()
print(f"Generated DataBuild package '{name}' in {generated_dir}")
if package_name != "UNKNOWN_PACKAGE":
print(f"Run 'bazel build //{package_name}/generated:{name}_graph.analyze' to use the generated graph")
else:
print(f"Run 'bazel build generated:{name}_graph.analyze' to use the generated graph")
def _generate_build_bazel(self, output_dir: str, name: str) -> None:
"""Generate BUILD.bazel with databuild_job and databuild_graph targets."""
import os
# Get job classes from the lookup table
job_classes = list(set(self.lookup.values()))
# Get parent package for dependencies
parent_package = self._get_package_name()
# Generate py_binary targets for each job
job_binaries = []
job_targets = []
for job_class in job_classes:
job_name = self._snake_case(job_class.__name__)
binary_name = f"{job_name}_binary"
job_targets.append(f'"{job_name}"')
job_script_name = f"{job_name}.py"
job_binaries.append(f'''py_binary(
name = "{binary_name}",
srcs = ["{job_script_name}"],
main = "{job_script_name}",
deps = ["//{parent_package}:dsl_src"],
)
databuild_job(
name = "{job_name}",
binary = ":{binary_name}",
)''')
# Generate the complete BUILD.bazel content
build_content = f'''load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
# Generated by DataBuild DSL - do not edit manually
# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel
{chr(10).join(job_binaries)}
py_binary(
name = "{name}_job_lookup",
srcs = ["{name}_job_lookup.py"],
deps = ["//{parent_package}:dsl_src"],
)
databuild_graph(
name = "{name}_graph",
jobs = [{", ".join(job_targets)}],
lookup = ":{name}_job_lookup",
visibility = ["//visibility:public"],
)
'''
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
f.write(build_content)
def _generate_job_scripts(self, output_dir: str) -> None:
"""Generate individual Python scripts for each job class."""
import os
# Get job classes and generate a script for each one
job_classes = list(set(self.lookup.values()))
graph_module_path = self._get_graph_module_path()
for job_class in job_classes:
job_name = self._snake_case(job_class.__name__)
script_name = f"{job_name}.py"
script_content = f'''#!/usr/bin/env python3
"""
Generated job script for {job_class.__name__}.
"""
import sys
import json
from {graph_module_path} import {job_class.__name__}
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
def parse_outputs_from_args(args: list[str]) -> list:
"""Parse partition output references from command line arguments."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in {job_class.__name__}.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {{arg}}")
return outputs
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception(f"Invalid command usage")
command = sys.argv[1]
job_instance = {job_class.__name__}()
if command == "config":
# Parse output partition references as PartitionRef objects (for Rust wrapper)
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
# Also parse them into DSL partition objects (for DSL job.config())
outputs = parse_outputs_from_args(sys.argv[2:])
# Call job's config method - returns list[JobConfig]
configs = job_instance.config(outputs)
# Wrap in JobConfigureResponse and serialize using to_dict()
response = JobConfigureResponse(configs=configs)
print(json.dumps(to_dict(response)))
elif command == "exec":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(sys.argv[2:])
else:
raise Exception(f"Invalid command `{{sys.argv[1]}}`")
'''
script_path = os.path.join(output_dir, script_name)
with open(script_path, "w") as f:
f.write(script_content)
# Make it executable
os.chmod(script_path, 0o755)
def _generate_job_lookup(self, output_dir: str, name: str) -> None:
"""Generate job lookup binary that maps partition patterns to job targets."""
import os
# Build the job lookup mappings with full package paths
package_name = self._get_package_name()
lookup_mappings = []
for partition_type, job_class in self.lookup.items():
job_name = self._snake_case(job_class.__name__)
pattern = partition_type._raw_pattern
full_target = f"//{package_name}/generated:{job_name}"
lookup_mappings.append(f' r"{pattern}": "{full_target}",')
lookup_content = f'''#!/usr/bin/env python3
"""
Generated job lookup for DataBuild DSL graph.
Maps partition patterns to job targets.
"""
import sys
import re
import json
from collections import defaultdict
# Mapping from partition patterns to job targets
JOB_MAPPINGS = {{
{chr(10).join(lookup_mappings)}
}}
def lookup_job_for_partition(partition_ref: str) -> str:
"""Look up which job can build the given partition reference."""
for pattern, job_target in JOB_MAPPINGS.items():
if re.match(pattern, partition_ref):
return job_target
raise ValueError(f"No job found for partition: {{partition_ref}}")
def main():
if len(sys.argv) < 2:
print("Usage: job_lookup.py <partition_ref> [partition_ref...]", file=sys.stderr)
sys.exit(1)
results = defaultdict(list)
try:
for partition_ref in sys.argv[1:]:
job_target = lookup_job_for_partition(partition_ref)
results[job_target].append(partition_ref)
# Output the results as JSON (matching existing lookup format)
print(json.dumps(dict(results)))
except ValueError as e:
print(f"ERROR: {{e}}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
'''
lookup_file = os.path.join(output_dir, f"{name}_job_lookup.py")
with open(lookup_file, "w") as f:
f.write(lookup_content)
# Make it executable
os.chmod(lookup_file, 0o755)
def _snake_case(self, name: str) -> str:
"""Convert CamelCase to snake_case."""
import re
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def _get_graph_module_path(self) -> str:
"""Get the module path for the graph containing this instance."""
# Try to find the module by looking at where the graph object is defined
import inspect
import sys
# Look through all loaded modules to find where this graph instance is defined
for module_name, module in sys.modules.items():
if hasattr(module, 'graph') and getattr(module, 'graph') is self:
if module_name != '__main__':
return module_name
# Look through the call stack to find the module that imported us
for frame_info in inspect.stack():
frame_globals = frame_info.frame.f_globals
module_name = frame_globals.get('__name__')
if module_name and module_name != '__main__' and 'graph' in frame_globals:
# Check if this frame has our graph
if frame_globals.get('graph') is self:
return module_name
# Last resort fallback - this will need to be manually configured
return "UNKNOWN_MODULE"
def _get_package_name(self) -> str:
"""Get the Bazel package name where the DSL source files are located."""
# Extract package from the graph label if available
if hasattr(self, 'label') and self.label.startswith('//'):
# Extract package from label like "//databuild/test/app:dsl_graph"
package_part = self.label.split(':')[0]
return package_part[2:] # Remove "//" prefix
# Fallback to trying to infer from module path
module_path = self._get_graph_module_path()
if module_path != "UNKNOWN_MODULE":
# Convert module path to package path
# e.g., "databuild.test.app.dsl.graph" -> "databuild/test/app/dsl"
parts = module_path.split('.')
if parts[-1] in ['graph', 'main']:
parts = parts[:-1]
return '/'.join(parts)
return "UNKNOWN_PACKAGE"
@dataclass
class JobConfigBuilder:

View file

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph.
Configured via environment variables:
- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph')
- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes')
"""
import sys
import json
import os
import importlib
from typing import List, Any
from databuild.proto import JobConfig
def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]:
"""Parse partition output references from command line arguments into partition objects."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in job_class.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}")
return outputs
def main():
if len(sys.argv) < 2:
print("Usage: dsl_job_wrapper.py <config|exec> [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
# Read configuration from environment
graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE')
job_class_name = os.environ.get('DATABUILD_JOB_CLASS')
if not graph_module_path:
print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr)
sys.exit(1)
if not job_class_name:
print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr)
sys.exit(1)
try:
# Import the graph module
module = importlib.import_module(graph_module_path)
graph = getattr(module, 'graph')
# Get the job class
job_class = getattr(module, job_class_name)
# Create job instance
job_instance = job_class()
except (ImportError, AttributeError) as e:
print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr)
sys.exit(1)
if command == "config":
try:
# Parse output partition references from remaining args
output_refs = sys.argv[2:]
if not output_refs:
print("ERROR: No output partition references provided", file=sys.stderr)
sys.exit(1)
outputs = parse_outputs_from_args(output_refs, job_class)
# Call job's config method
configs = job_instance.config(outputs)
# Output each config as JSON (one per line for multiple configs)
for config in configs:
# Convert JobConfig to dict for JSON serialization
config_dict = {
'outputs': [{'str': ref.str} for ref in config.outputs],
'inputs': [
{
'dep_type_code': dep.dep_type_code,
'dep_type_name': dep.dep_type_name,
'partition_ref': {'str': dep.partition_ref.str}
} for dep in config.inputs
],
'args': config.args,
'env': config.env,
}
print(json.dumps(config_dict))
except Exception as e:
print(f"ERROR: Config failed: {e}", file=sys.stderr)
sys.exit(1)
elif command == "exec":
try:
# Read config from stdin
config_json = sys.stdin.read().strip()
if not config_json:
print("ERROR: No config provided on stdin", file=sys.stderr)
sys.exit(1)
config_dict = json.loads(config_json)
# Convert dict back to JobConfig
from databuild.proto import PartitionRef, DataDep, DepType
config = JobConfig(
outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']],
inputs=[
DataDep(
dep_type_code=dep['dep_type_code'],
dep_type_name=dep['dep_type_name'],
partition_ref=PartitionRef(str=dep['partition_ref']['str'])
) for dep in config_dict['inputs']
],
args=config_dict['args'],
env=config_dict['env'],
)
# Call job's exec method
job_instance.exec(config)
except Exception as e:
print(f"ERROR: Execution failed: {e}", file=sys.stderr)
sys.exit(1)
else:
print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
DSL code generator that can be run as a py_binary with proper dependencies.
"""
import sys
import os
import importlib
def main():
if len(sys.argv) != 4:
print("Usage: generator.py <module_path> <graph_attr> <output_dir>", file=sys.stderr)
sys.exit(1)
module_path = sys.argv[1]
graph_attr = sys.argv[2]
output_dir = sys.argv[3]
# Extract the base name from the output directory for naming
name = os.path.basename(output_dir.rstrip('/')) or "graph"
try:
# Import the graph module
module = importlib.import_module(module_path)
graph = getattr(module, graph_attr)
# Generate the bazel package
graph.generate_bazel_package(name, output_dir)
print(f"Generated DataBuild DSL package in {output_dir}")
except ImportError as e:
print(f"ERROR: Failed to import {graph_attr} from {module_path}: {e}", file=sys.stderr)
sys.exit(1)
except AttributeError as e:
print(f"ERROR: Module {module_path} does not have attribute {graph_attr}: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"ERROR: Generation failed: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -37,6 +37,14 @@ pub struct SqliteBuildEventLog {
impl SqliteBuildEventLog {
pub async fn new(path: &str) -> Result<Self> {
// Create parent directory if it doesn't exist
if let Some(parent) = std::path::Path::new(path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| BuildEventLogError::ConnectionError(
format!("Failed to create directory {}: {}", parent.display(), e)
))?;
}
let conn = Connection::open(path)
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;

View file

@ -9,7 +9,7 @@ use std::io::{BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::thread;
use std::{env, thread};
use std::time::{Duration, Instant};
// Command line parsing removed - using environment variables
use uuid::Uuid;
@ -52,48 +52,6 @@ fn get_task_key(task: &Task) -> String {
key_parts.join("|")
}
// Resolves the executable path from runfiles.
// Mirrors the Go implementation's resolveExecutableFromRunfiles.
fn resolve_executable_from_runfiles(job_label: &str) -> PathBuf {
let mut target_name = job_label.to_string();
if let Some(colon_index) = job_label.rfind(':') {
target_name = job_label[colon_index + 1..].to_string();
} else if let Some(name) = Path::new(job_label).file_name().and_then(|n| n.to_str()) {
target_name = name.to_string();
}
let exec_name = format!("{}.exec", target_name);
if let Ok(runfiles_dir_str) = std::env::var("RUNFILES_DIR") {
let path = PathBuf::from(runfiles_dir_str).join("_main").join(&exec_name);
debug!("Resolved executable path (RUNFILES_DIR): {}", path.display());
return path;
}
if let Ok(current_exe) = std::env::current_exe() {
let mut runfiles_dir_path = PathBuf::from(format!("{}.runfiles", current_exe.display()));
if !runfiles_dir_path.is_dir() { // Bazel often puts it next to the binary
if let Some(parent) = current_exe.parent() {
runfiles_dir_path = parent.join(format!("{}.runfiles", current_exe.file_name().unwrap_or_default().to_string_lossy()));
}
}
if runfiles_dir_path.is_dir() {
let path = runfiles_dir_path.join("_main").join(&exec_name);
debug!("Resolved executable path (derived RUNFILES_DIR): {}", path.display());
return path;
} else {
warn!("Warning: RUNFILES_DIR not found or invalid, and derived path {} is not a directory.", runfiles_dir_path.display());
}
} else {
warn!("Warning: Could not determine current executable path.");
}
let fallback_path = PathBuf::from(format!("{}.exec", job_label));
warn!("Falling back to direct executable path: {}", fallback_path.display());
fallback_path
}
fn worker(
task_rx: Receiver<Arc<Task>>,
result_tx: Sender<TaskExecutionResult>,
@ -105,7 +63,8 @@ fn worker(
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
let start_time = Instant::now();
let exec_path = resolve_executable_from_runfiles(&task.job.as_ref().unwrap().label);
let exec_path = env::var("DATABUILD_EXECUTE_BINARY")
.map_err(|e| Error::from(format!("Env var DATABUILD_EXECUTE_BINARY not set: {}", e)));
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
Ok(json) => json,

View file

@ -651,21 +651,52 @@ def _databuild_graph_build_impl(ctx):
script_content = RUNFILES_PREFIX + """
# Build DATABUILD_CANDIDATE_JOBS dynamically with proper rlocation resolution
%s
{candidate_jobs_script}
export DATABUILD_CANDIDATE_JOBS="$CANDIDATE_JOBS_JSON"
export DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/%s)"
export DATABUILD_GRAPH_LABEL="%s"
# Resolve binary paths with error checking
DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/{lookup_path})"
if [[ -z "$DATABUILD_JOB_LOOKUP_PATH" || ! -f "$DATABUILD_JOB_LOOKUP_PATH" ]]; then
echo "ERROR: Could not find job lookup binary at _main/{lookup_path}" >&2
exit 1
fi
export DATABUILD_JOB_LOOKUP_PATH
DATABUILD_ANALYZE_BINARY="$(rlocation _main/{analyze_path})"
if [[ -z "$DATABUILD_ANALYZE_BINARY" || ! -f "$DATABUILD_ANALYZE_BINARY" ]]; then
echo "ERROR: Could not find analyze binary at _main/{analyze_path}" >&2
exit 1
fi
export DATABUILD_ANALYZE_BINARY
DATABUILD_EXECUTE_BINARY="$(rlocation _main/{execute_path})"
if [[ -z "$DATABUILD_EXECUTE_BINARY" || ! -f "$DATABUILD_EXECUTE_BINARY" ]]; then
echo "ERROR: Could not find execute binary at _main/{execute_path}" >&2
exit 1
fi
export DATABUILD_EXECUTE_BINARY
export DATABUILD_GRAPH_LABEL="{graph_label}"
# Generate a single build request ID for the entire CLI operation
export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())")
# Run unified DataBuild CLI wrapper
"$(rlocation databuild+/databuild/cli/databuild_cli)" "$@"
""" % (
candidate_jobs_script,
ctx.attr.lookup.files_to_run.executable.short_path,
ctx.attr.graph_label,
# Run unified DataBuild CLI wrapper using the provided cli_wrapper attribute
CLI_BINARY="$(rlocation _main/{cli_path})"
if [[ -z "$CLI_BINARY" || ! -f "$CLI_BINARY" ]]; then
echo "ERROR: Could not find CLI binary at _main/{cli_path}" >&2
exit 1
fi
"$CLI_BINARY" "$@"
""".format(
candidate_jobs_script = candidate_jobs_script,
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
analyze_path = ctx.attr._analyze.files_to_run.executable.short_path,
execute_path = ctx.attr._execute.files_to_run.executable.short_path,
graph_label = ctx.attr.graph_label,
cli_path = ctx.attr.cli_wrapper.files_to_run.executable.short_path,
)
ctx.actions.write(
@ -674,58 +705,29 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())
content = script_content,
)
# Gather the configure and execute executables
# Gather the configure executables
configure_executables = [
job[DataBuildJobInfo].configure.files_to_run.executable
for job in ctx.attr.jobs
]
# Get the execute targets - these are the .exec files that need to be in runfiles
execute_executables = []
for job in ctx.attr.jobs:
# The job target itself contains references to both configure and execute
# We need to find the .exec target for each job
job_name = job.label.name
exec_target_name = job_name + ".exec"
# Find the .exec target in the same package
for attr_name in dir(job):
if attr_name.endswith("_exec") or exec_target_name in attr_name:
exec_target = getattr(job, attr_name, None)
if exec_target and hasattr(exec_target, "files_to_run"):
execute_executables.append(exec_target.files_to_run.executable)
break
# Also check if we can access exec targets directly from job dependencies
all_job_files = []
for job in ctx.attr.jobs:
if hasattr(job, "default_runfiles") and job.default_runfiles:
all_job_files.extend(job.default_runfiles.files.to_list())
# Create runfiles including the CLI binary, analyze/execute binaries and all dependencies
runfiles = ctx.runfiles(
files = [ctx.executable.cli_wrapper, ctx.executable.lookup] + configure_executables + execute_executables + all_job_files,
files = [ctx.executable.cli_wrapper, ctx.executable.lookup, ctx.executable._analyze, ctx.executable._execute] + configure_executables,
).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles,
)
ctx.attr._analyze.default_runfiles,
).merge(ctx.attr._execute.default_runfiles).merge(ctx.attr._bash_runfiles.default_runfiles).merge_all([job.default_runfiles for job in ctx.attr.jobs])
# Merge runfiles from all configure targets and job targets
# Merge runfiles from all configure targets
for job in ctx.attr.jobs:
configure_target = job[DataBuildJobInfo].configure
runfiles = runfiles.merge(configure_target.default_runfiles)
# Also merge the job's own runfiles which should include the .exec target
runfiles = runfiles.merge(job.default_runfiles)
return [
DefaultInfo(
executable = script,
runfiles = runfiles,
),
DataBuildGraphInfo(
analyze = ctx.attr.cli_wrapper,
exec = ctx.attr.cli_wrapper,
jobs = ctx.attr.jobs,
),
]
_databuild_graph_build = rule(
@ -755,6 +757,16 @@ _databuild_graph_build = rule(
default = Label("@bazel_tools//tools/bash/runfiles"),
allow_files = True,
),
"_analyze": attr.label(
default = "@databuild//databuild/graph:analyze",
executable = True,
cfg = "target",
),
"_execute": attr.label(
default = "@databuild//databuild/graph:execute",
executable = True,
cfg = "target",
),
},
executable = True,
)
@ -902,3 +914,186 @@ _databuild_graph_service = rule(
},
executable = True,
)
def databuild_dsl_generator(
name,
graph_file,
graph_attr = "graph",
output_package = None,
deps = [],
visibility = None):
"""Creates a DataBuild DSL code generator that can generate BUILD.bazel and job binaries.
Args:
name: Name of the generator target (typically ends with .generate)
graph_file: Python file containing the DSL graph definition
graph_attr: Name of the graph attribute in the module (default: "graph")
output_package: Target package for generated files (default: current package)
deps: Dependencies needed to load the graph
visibility: Visibility specification
"""
if not output_package:
output_package = "//" + native.package_name()
_databuild_dsl_generator_rule(
name = name,
graph_file = graph_file,
graph_attr = graph_attr,
output_package = output_package,
deps = deps,
visibility = visibility,
)
def _databuild_dsl_generator_impl(ctx):
"""Implementation of the DSL generator rule."""
script = ctx.actions.declare_file(ctx.label.name + "_generator.py")
# Get the module path from the graph file
graph_file_path = ctx.file.graph_file.short_path
if graph_file_path.endswith(".py"):
graph_file_path = graph_file_path[:-3]
module_path = graph_file_path.replace("/", ".")
# Get the package path for output
package_path = ctx.attr.output_package.strip("//").replace(":", "/")
script_content = """#!/usr/bin/env python3
import os
import sys
# Determine output directory
workspace_root = os.environ.get('BUILD_WORKSPACE_DIRECTORY')
if workspace_root:
# Running with bazel run - write to source tree
output_dir = os.path.join(workspace_root, '{package_path}')
else:
# Running with bazel build - write to current directory (bazel-bin)
output_dir = '.'
print(f"Generating DataBuild DSL code to {{output_dir}}")
try:
# Setup runfiles for module resolution
try:
from python.runfiles import runfiles
r = runfiles.Create()
if r:
generator_path = r.Rlocation("databuild/databuild/dsl/python/generator")
else:
raise ImportError("runfiles not available")
except ImportError:
# Fallback - assume generator is in the same directory as this script
import subprocess
import shutil
# Try to find the generator binary using bazel
generator_path = shutil.which("bazel")
if generator_path:
import subprocess
result = subprocess.run([
"bazel", "run", "//databuild/dsl/python:generator", "--",
"{module_path}", "{graph_attr}", output_dir
], cwd=workspace_root if workspace_root else ".", capture_output=True, text=True)
if result.returncode != 0:
print(f"ERROR: Generation failed:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
sys.exit(1)
print(result.stdout)
sys.exit(0)
else:
print("ERROR: Could not find bazel to run generator", file=sys.stderr)
sys.exit(1)
# Run the generator with proper module resolution
import subprocess
result = subprocess.run([
generator_path, "{module_path}", "{graph_attr}", output_dir
], capture_output=True, text=True)
if result.returncode != 0:
print(f"ERROR: Generation failed:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
sys.exit(1)
print(result.stdout)
except Exception as e:
print(f"ERROR: Generation failed: {{e}}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
""".format(
package_path = package_path,
module_path = module_path,
graph_attr = ctx.attr.graph_attr,
name = ctx.attr.name.replace(".generate", ""),
)
ctx.actions.write(
output = script,
content = script_content,
is_executable = True,
)
# Create runfiles with all dependencies
runfiles = ctx.runfiles(
files = [ctx.file.graph_file, ctx.executable._generator],
)
# Merge runfiles from all dependencies
for dep in ctx.attr.deps:
if hasattr(dep, "default_runfiles"):
runfiles = runfiles.merge(dep.default_runfiles)
# Add protobuf dependencies
if hasattr(ctx.attr._py_proto, "default_runfiles"):
runfiles = runfiles.merge(ctx.attr._py_proto.default_runfiles)
# Add generator runfiles
runfiles = runfiles.merge(ctx.attr._generator.default_runfiles)
# Also add Python runfiles for proper module resolution
runfiles = runfiles.merge(ctx.attr._python_runfiles.default_runfiles)
return [DefaultInfo(
executable = script,
runfiles = runfiles,
)]
_databuild_dsl_generator_rule = rule(
implementation = _databuild_dsl_generator_impl,
attrs = {
"graph_file": attr.label(
doc = "Python file containing the DSL graph definition",
allow_single_file = [".py"],
mandatory = True,
),
"graph_attr": attr.string(
doc = "Name of the graph attribute in the module",
default = "graph",
),
"output_package": attr.string(
doc = "Target package for generated files",
mandatory = True,
),
"deps": attr.label_list(
doc = "Dependencies needed to load the graph",
allow_empty = True,
),
"_python_runfiles": attr.label(
default = "@rules_python//python/runfiles",
allow_files = True,
),
"_py_proto": attr.label(
default = "//databuild:py_proto",
),
"_generator": attr.label(
default = "//databuild/dsl/python:generator",
executable = True,
cfg = "target",
),
},
executable = True,
)

View file

@ -1,3 +1,5 @@
load("@databuild//databuild:rules.bzl", "databuild_dsl_generator")
py_library(
name = "dsl_src",
srcs = glob(
@ -11,3 +13,12 @@ py_library(
"//databuild/test/app:job_src",
],
)
databuild_dsl_generator(
name = "graph.generate",
graph_file = "graph.py",
graph_attr = "graph",
output_package = "//databuild/test/app/dsl",
deps = [":dsl_src"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
Generated job lookup for DataBuild DSL graph.
Maps partition patterns to job targets.
"""
import sys
import re
# Mapping from partition patterns to job targets
JOB_MAPPINGS = {
r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":ingest_color_votes",
r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":trailing_color_votes",
r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":trailing_color_votes",
r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})": ":aggregate_color_votes",
r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": ":color_vote_report_calc",
}
def lookup_job_for_partition(partition_ref: str) -> str:
"""Look up which job can build the given partition reference."""
for pattern, job_target in JOB_MAPPINGS.items():
if re.match(pattern, partition_ref):
return job_target
raise ValueError(f"No job found for partition: {partition_ref}")
def main():
if len(sys.argv) != 2:
print("Usage: job_lookup.py <partition_ref>", file=sys.stderr)
sys.exit(1)
partition_ref = sys.argv[1]
try:
job_target = lookup_job_for_partition(partition_ref)
print(job_target)
except ValueError as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph.
Configured via environment variables:
- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph')
- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes')
"""
import sys
import json
import os
import importlib
from typing import List, Any
from databuild.proto import JobConfig
def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]:
"""Parse partition output references from command line arguments into partition objects."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in job_class.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}")
return outputs
def main():
if len(sys.argv) < 2:
print("Usage: dsl_job_wrapper.py <config|exec> [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
# Read configuration from environment
graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE')
job_class_name = os.environ.get('DATABUILD_JOB_CLASS')
if not graph_module_path:
print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr)
sys.exit(1)
if not job_class_name:
print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr)
sys.exit(1)
try:
# Import the graph module
module = importlib.import_module(graph_module_path)
graph = getattr(module, 'graph')
# Get the job class
job_class = getattr(module, job_class_name)
# Create job instance
job_instance = job_class()
except (ImportError, AttributeError) as e:
print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr)
sys.exit(1)
if command == "config":
try:
# Parse output partition references from remaining args
output_refs = sys.argv[2:]
if not output_refs:
print("ERROR: No output partition references provided", file=sys.stderr)
sys.exit(1)
outputs = parse_outputs_from_args(output_refs, job_class)
# Call job's config method
configs = job_instance.config(outputs)
# Output each config as JSON (one per line for multiple configs)
for config in configs:
# Convert JobConfig to dict for JSON serialization
config_dict = {
'outputs': [{'str': ref.str} for ref in config.outputs],
'inputs': [
{
'dep_type_code': dep.dep_type_code,
'dep_type_name': dep.dep_type_name,
'partition_ref': {'str': dep.partition_ref.str}
} for dep in config.inputs
],
'args': config.args,
'env': config.env,
}
print(json.dumps(config_dict))
except Exception as e:
print(f"ERROR: Config failed: {e}", file=sys.stderr)
sys.exit(1)
elif command == "exec":
try:
# Read config from stdin
config_json = sys.stdin.read().strip()
if not config_json:
print("ERROR: No config provided on stdin", file=sys.stderr)
sys.exit(1)
config_dict = json.loads(config_json)
# Convert dict back to JobConfig
from databuild.proto import PartitionRef, DataDep, DepType
config = JobConfig(
outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']],
inputs=[
DataDep(
dep_type_code=dep['dep_type_code'],
dep_type_name=dep['dep_type_name'],
partition_ref=PartitionRef(str=dep['partition_ref']['str'])
) for dep in config_dict['inputs']
],
args=config_dict['args'],
env=config_dict['env'],
)
# Call job's exec method
job_instance.exec(config)
except Exception as e:
print(f"ERROR: Execution failed: {e}", file=sys.stderr)
sys.exit(1)
else:
print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -19,7 +19,7 @@ from databuild.test.app.dsl.partitions import (
)
from datetime import date, timedelta
graph = DataBuildGraph("//databuild/test/app:dsl_graph")
graph = DataBuildGraph("//databuild/test/app/dsl:dsl_graph")
@graph.job