diff --git a/.gitignore b/.gitignore index 65b2cb8..7a487fd 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ generated_number target logs/databuild/ **/logs/databuild/ + +# DSL generated code +**/generated/ diff --git a/databuild/cli/main.rs b/databuild/cli/main.rs index ebf045d..656c9dd 100644 --- a/databuild/cli/main.rs +++ b/databuild/cli/main.rs @@ -32,16 +32,11 @@ async fn run_analysis( .map_err(|_| CliError::Environment("DATABUILD_JOB_LOOKUP_PATH not set".to_string()))?; let graph_label = env::var("DATABUILD_GRAPH_LABEL") .map_err(|_| CliError::Environment("DATABUILD_GRAPH_LABEL not set".to_string()))?; - + // Find analyze binary using runfiles - let analyze_path = env::var("RUNFILES_DIR") - .map(|runfiles_dir| format!("{}/databuild+/databuild/graph/analyze", runfiles_dir)) - .or_else(|_| { - // Fallback for direct execution - Ok("./databuild/graph/analyze".to_string()) - }) - .map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate analyze binary: {}", e)))?; - + let analyze_path = env::var("DATABUILD_ANALYZE_BINARY") + .map_err(|_| CliError::Environment("DATABUILD_ANALYZE_BINARY not set".to_string()))?; + // Build analyze command let cmd = Command::new(analyze_path) .args(partitions) @@ -88,13 +83,8 @@ async fn run_execution( let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").unwrap_or_else(|_| "stdout".to_string()); // Find execute binary using runfiles - let execute_path = env::var("RUNFILES_DIR") - .map(|runfiles_dir| format!("{}/databuild+/databuild/graph/execute", runfiles_dir)) - .or_else(|_| { - // Fallback for direct execution - Ok("./databuild/graph/execute".to_string()) - }) - .map_err(|e: std::env::VarError| CliError::Environment(format!("Failed to locate execute binary: {}", e)))?; + let execute_path = env::var("DATABUILD_EXECUTE_BINARY") + .map_err(|_| CliError::Environment("DATABUILD_EXECUTE_BINARY not set".to_string()))?; // Build execute command let mut cmd = Command::new(execute_path) diff --git a/databuild/dsl/python/BUILD.bazel b/databuild/dsl/python/BUILD.bazel index 21c63f6..e36b153 100644 --- a/databuild/dsl/python/BUILD.bazel +++ b/databuild/dsl/python/BUILD.bazel @@ -6,3 +6,17 @@ py_library( "//databuild:py_proto", ], ) + +py_binary( + name = "generator", + srcs = ["generator.py"], + main = "generator.py", + data = ["dsl_job_wrapper.py"], + visibility = ["//visibility:public"], + deps = [ + ":dsl", + "//databuild:py_proto", + # Include test app modules for testing + "//databuild/test/app/dsl:dsl_src", + ], +) diff --git a/databuild/dsl/python/dsl.py b/databuild/dsl/python/dsl.py index e997ed3..a95f33a 100644 --- a/databuild/dsl/python/dsl.py +++ b/databuild/dsl/python/dsl.py @@ -84,6 +84,291 @@ class DataBuildGraph: """Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets""" raise NotImplementedError + def generate_bazel_package(self, name: str, output_dir: str) -> None: + """Generate BUILD.bazel and binaries into a generated/ subdirectory. + + Args: + name: Base name for the generated graph (without .generate suffix) + output_dir: Directory to write generated files to (will create generated/ subdir) + """ + import os + import shutil + + # Create generated/ subdirectory + generated_dir = os.path.join(output_dir, "generated") + os.makedirs(generated_dir, exist_ok=True) + + # Generate BUILD.bazel with job and graph targets + self._generate_build_bazel(generated_dir, name) + + # Generate individual job scripts (instead of shared wrapper) + self._generate_job_scripts(generated_dir) + + # Generate job lookup binary + self._generate_job_lookup(generated_dir, name) + + package_name = self._get_package_name() + print(f"Generated DataBuild package '{name}' in {generated_dir}") + if package_name != "UNKNOWN_PACKAGE": + print(f"Run 'bazel build //{package_name}/generated:{name}_graph.analyze' to use the generated graph") + else: + print(f"Run 'bazel build generated:{name}_graph.analyze' to use the generated graph") + + def _generate_build_bazel(self, output_dir: str, name: str) -> None: + """Generate BUILD.bazel with databuild_job and databuild_graph targets.""" + import os + + # Get job classes from the lookup table + job_classes = list(set(self.lookup.values())) + + # Get parent package for dependencies + parent_package = self._get_package_name() + + # Generate py_binary targets for each job + job_binaries = [] + job_targets = [] + + for job_class in job_classes: + job_name = self._snake_case(job_class.__name__) + binary_name = f"{job_name}_binary" + job_targets.append(f'"{job_name}"') + + job_script_name = f"{job_name}.py" + job_binaries.append(f'''py_binary( + name = "{binary_name}", + srcs = ["{job_script_name}"], + main = "{job_script_name}", + deps = ["//{parent_package}:dsl_src"], +) + +databuild_job( + name = "{job_name}", + binary = ":{binary_name}", +)''') + + # Generate the complete BUILD.bazel content + build_content = f'''load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph") + +# Generated by DataBuild DSL - do not edit manually +# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel + +{chr(10).join(job_binaries)} + +py_binary( + name = "{name}_job_lookup", + srcs = ["{name}_job_lookup.py"], + deps = ["//{parent_package}:dsl_src"], +) + +databuild_graph( + name = "{name}_graph", + jobs = [{", ".join(job_targets)}], + lookup = ":{name}_job_lookup", + visibility = ["//visibility:public"], +) +''' + + with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f: + f.write(build_content) + + def _generate_job_scripts(self, output_dir: str) -> None: + """Generate individual Python scripts for each job class.""" + import os + + # Get job classes and generate a script for each one + job_classes = list(set(self.lookup.values())) + graph_module_path = self._get_graph_module_path() + + for job_class in job_classes: + job_name = self._snake_case(job_class.__name__) + script_name = f"{job_name}.py" + + script_content = f'''#!/usr/bin/env python3 +""" +Generated job script for {job_class.__name__}. +""" + +import sys +import json +from {graph_module_path} import {job_class.__name__} +from databuild.proto import PartitionRef, JobConfigureResponse, to_dict + + +def parse_outputs_from_args(args: list[str]) -> list: + """Parse partition output references from command line arguments.""" + outputs = [] + for arg in args: + # Find which output type can deserialize this partition reference + for output_type in {job_class.__name__}.output_types: + try: + partition = output_type.deserialize(arg) + outputs.append(partition) + break + except ValueError: + continue + else: + raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {{arg}}") + + return outputs + + +if __name__ == "__main__": + if len(sys.argv) < 2: + raise Exception(f"Invalid command usage") + + command = sys.argv[1] + job_instance = {job_class.__name__}() + + if command == "config": + # Parse output partition references as PartitionRef objects (for Rust wrapper) + output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]] + + # Also parse them into DSL partition objects (for DSL job.config()) + outputs = parse_outputs_from_args(sys.argv[2:]) + + # Call job's config method - returns list[JobConfig] + configs = job_instance.config(outputs) + + # Wrap in JobConfigureResponse and serialize using to_dict() + response = JobConfigureResponse(configs=configs) + print(json.dumps(to_dict(response))) + + elif command == "exec": + # The exec method expects a JobConfig but the Rust wrapper passes args + # For now, let the DSL job handle the args directly + # TODO: This needs to be refined based on actual Rust wrapper interface + job_instance.exec(sys.argv[2:]) + + else: + raise Exception(f"Invalid command `{{sys.argv[1]}}`") +''' + + script_path = os.path.join(output_dir, script_name) + with open(script_path, "w") as f: + f.write(script_content) + + # Make it executable + os.chmod(script_path, 0o755) + + def _generate_job_lookup(self, output_dir: str, name: str) -> None: + """Generate job lookup binary that maps partition patterns to job targets.""" + import os + + # Build the job lookup mappings with full package paths + package_name = self._get_package_name() + lookup_mappings = [] + for partition_type, job_class in self.lookup.items(): + job_name = self._snake_case(job_class.__name__) + pattern = partition_type._raw_pattern + full_target = f"//{package_name}/generated:{job_name}" + lookup_mappings.append(f' r"{pattern}": "{full_target}",') + + lookup_content = f'''#!/usr/bin/env python3 +""" +Generated job lookup for DataBuild DSL graph. +Maps partition patterns to job targets. +""" + +import sys +import re +import json +from collections import defaultdict + + +# Mapping from partition patterns to job targets +JOB_MAPPINGS = {{ +{chr(10).join(lookup_mappings)} +}} + + +def lookup_job_for_partition(partition_ref: str) -> str: + """Look up which job can build the given partition reference.""" + for pattern, job_target in JOB_MAPPINGS.items(): + if re.match(pattern, partition_ref): + return job_target + + raise ValueError(f"No job found for partition: {{partition_ref}}") + + +def main(): + if len(sys.argv) < 2: + print("Usage: job_lookup.py [partition_ref...]", file=sys.stderr) + sys.exit(1) + + results = defaultdict(list) + try: + for partition_ref in sys.argv[1:]: + job_target = lookup_job_for_partition(partition_ref) + results[job_target].append(partition_ref) + + # Output the results as JSON (matching existing lookup format) + print(json.dumps(dict(results))) + except ValueError as e: + print(f"ERROR: {{e}}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() +''' + + lookup_file = os.path.join(output_dir, f"{name}_job_lookup.py") + with open(lookup_file, "w") as f: + f.write(lookup_content) + + # Make it executable + os.chmod(lookup_file, 0o755) + + def _snake_case(self, name: str) -> str: + """Convert CamelCase to snake_case.""" + import re + s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) + return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() + + def _get_graph_module_path(self) -> str: + """Get the module path for the graph containing this instance.""" + # Try to find the module by looking at where the graph object is defined + import inspect + import sys + + # Look through all loaded modules to find where this graph instance is defined + for module_name, module in sys.modules.items(): + if hasattr(module, 'graph') and getattr(module, 'graph') is self: + if module_name != '__main__': + return module_name + + # Look through the call stack to find the module that imported us + for frame_info in inspect.stack(): + frame_globals = frame_info.frame.f_globals + module_name = frame_globals.get('__name__') + if module_name and module_name != '__main__' and 'graph' in frame_globals: + # Check if this frame has our graph + if frame_globals.get('graph') is self: + return module_name + + # Last resort fallback - this will need to be manually configured + return "UNKNOWN_MODULE" + + def _get_package_name(self) -> str: + """Get the Bazel package name where the DSL source files are located.""" + # Extract package from the graph label if available + if hasattr(self, 'label') and self.label.startswith('//'): + # Extract package from label like "//databuild/test/app:dsl_graph" + package_part = self.label.split(':')[0] + return package_part[2:] # Remove "//" prefix + + # Fallback to trying to infer from module path + module_path = self._get_graph_module_path() + if module_path != "UNKNOWN_MODULE": + # Convert module path to package path + # e.g., "databuild.test.app.dsl.graph" -> "databuild/test/app/dsl" + parts = module_path.split('.') + if parts[-1] in ['graph', 'main']: + parts = parts[:-1] + return '/'.join(parts) + + return "UNKNOWN_PACKAGE" + @dataclass class JobConfigBuilder: diff --git a/databuild/dsl/python/dsl_job_wrapper.py b/databuild/dsl/python/dsl_job_wrapper.py new file mode 100644 index 0000000..31af892 --- /dev/null +++ b/databuild/dsl/python/dsl_job_wrapper.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph. +Configured via environment variables: +- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph') +- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes') +""" + +import sys +import json +import os +import importlib +from typing import List, Any +from databuild.proto import JobConfig + + +def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]: + """Parse partition output references from command line arguments into partition objects.""" + outputs = [] + for arg in args: + # Find which output type can deserialize this partition reference + for output_type in job_class.output_types: + try: + partition = output_type.deserialize(arg) + outputs.append(partition) + break + except ValueError: + continue + else: + raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}") + + return outputs + + +def main(): + if len(sys.argv) < 2: + print("Usage: dsl_job_wrapper.py [args...]", file=sys.stderr) + sys.exit(1) + + command = sys.argv[1] + + # Read configuration from environment + graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE') + job_class_name = os.environ.get('DATABUILD_JOB_CLASS') + + if not graph_module_path: + print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr) + sys.exit(1) + + if not job_class_name: + print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr) + sys.exit(1) + + try: + # Import the graph module + module = importlib.import_module(graph_module_path) + graph = getattr(module, 'graph') + + # Get the job class + job_class = getattr(module, job_class_name) + + # Create job instance + job_instance = job_class() + + except (ImportError, AttributeError) as e: + print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr) + sys.exit(1) + + if command == "config": + try: + # Parse output partition references from remaining args + output_refs = sys.argv[2:] + if not output_refs: + print("ERROR: No output partition references provided", file=sys.stderr) + sys.exit(1) + + outputs = parse_outputs_from_args(output_refs, job_class) + + # Call job's config method + configs = job_instance.config(outputs) + + # Output each config as JSON (one per line for multiple configs) + for config in configs: + # Convert JobConfig to dict for JSON serialization + config_dict = { + 'outputs': [{'str': ref.str} for ref in config.outputs], + 'inputs': [ + { + 'dep_type_code': dep.dep_type_code, + 'dep_type_name': dep.dep_type_name, + 'partition_ref': {'str': dep.partition_ref.str} + } for dep in config.inputs + ], + 'args': config.args, + 'env': config.env, + } + print(json.dumps(config_dict)) + + except Exception as e: + print(f"ERROR: Config failed: {e}", file=sys.stderr) + sys.exit(1) + + elif command == "exec": + try: + # Read config from stdin + config_json = sys.stdin.read().strip() + if not config_json: + print("ERROR: No config provided on stdin", file=sys.stderr) + sys.exit(1) + + config_dict = json.loads(config_json) + + # Convert dict back to JobConfig + from databuild.proto import PartitionRef, DataDep, DepType + + config = JobConfig( + outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']], + inputs=[ + DataDep( + dep_type_code=dep['dep_type_code'], + dep_type_name=dep['dep_type_name'], + partition_ref=PartitionRef(str=dep['partition_ref']['str']) + ) for dep in config_dict['inputs'] + ], + args=config_dict['args'], + env=config_dict['env'], + ) + + # Call job's exec method + job_instance.exec(config) + + except Exception as e: + print(f"ERROR: Execution failed: {e}", file=sys.stderr) + sys.exit(1) + + else: + print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/databuild/dsl/python/generator.py b/databuild/dsl/python/generator.py new file mode 100644 index 0000000..0551094 --- /dev/null +++ b/databuild/dsl/python/generator.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +""" +DSL code generator that can be run as a py_binary with proper dependencies. +""" + +import sys +import os +import importlib + + +def main(): + if len(sys.argv) != 4: + print("Usage: generator.py ", file=sys.stderr) + sys.exit(1) + + module_path = sys.argv[1] + graph_attr = sys.argv[2] + output_dir = sys.argv[3] + + # Extract the base name from the output directory for naming + name = os.path.basename(output_dir.rstrip('/')) or "graph" + + try: + # Import the graph module + module = importlib.import_module(module_path) + graph = getattr(module, graph_attr) + + # Generate the bazel package + graph.generate_bazel_package(name, output_dir) + + print(f"Generated DataBuild DSL package in {output_dir}") + + except ImportError as e: + print(f"ERROR: Failed to import {graph_attr} from {module_path}: {e}", file=sys.stderr) + sys.exit(1) + except AttributeError as e: + print(f"ERROR: Module {module_path} does not have attribute {graph_attr}: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"ERROR: Generation failed: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/databuild/event_log/sqlite.rs b/databuild/event_log/sqlite.rs index 37c6c1b..987fe9c 100644 --- a/databuild/event_log/sqlite.rs +++ b/databuild/event_log/sqlite.rs @@ -37,6 +37,14 @@ pub struct SqliteBuildEventLog { impl SqliteBuildEventLog { pub async fn new(path: &str) -> Result { + // Create parent directory if it doesn't exist + if let Some(parent) = std::path::Path::new(path).parent() { + std::fs::create_dir_all(parent) + .map_err(|e| BuildEventLogError::ConnectionError( + format!("Failed to create directory {}: {}", parent.display(), e) + ))?; + } + let conn = Connection::open(path) .map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?; diff --git a/databuild/graph/execute.rs b/databuild/graph/execute.rs index 37a988e..4e381a3 100644 --- a/databuild/graph/execute.rs +++ b/databuild/graph/execute.rs @@ -9,7 +9,7 @@ use std::io::{BufReader, Read, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::sync::Arc; -use std::thread; +use std::{env, thread}; use std::time::{Duration, Instant}; // Command line parsing removed - using environment variables use uuid::Uuid; @@ -52,48 +52,6 @@ fn get_task_key(task: &Task) -> String { key_parts.join("|") } -// Resolves the executable path from runfiles. -// Mirrors the Go implementation's resolveExecutableFromRunfiles. -fn resolve_executable_from_runfiles(job_label: &str) -> PathBuf { - let mut target_name = job_label.to_string(); - if let Some(colon_index) = job_label.rfind(':') { - target_name = job_label[colon_index + 1..].to_string(); - } else if let Some(name) = Path::new(job_label).file_name().and_then(|n| n.to_str()) { - target_name = name.to_string(); - } - - let exec_name = format!("{}.exec", target_name); - - if let Ok(runfiles_dir_str) = std::env::var("RUNFILES_DIR") { - let path = PathBuf::from(runfiles_dir_str).join("_main").join(&exec_name); - debug!("Resolved executable path (RUNFILES_DIR): {}", path.display()); - return path; - } - - if let Ok(current_exe) = std::env::current_exe() { - let mut runfiles_dir_path = PathBuf::from(format!("{}.runfiles", current_exe.display())); - if !runfiles_dir_path.is_dir() { // Bazel often puts it next to the binary - if let Some(parent) = current_exe.parent() { - runfiles_dir_path = parent.join(format!("{}.runfiles", current_exe.file_name().unwrap_or_default().to_string_lossy())); - } - } - - if runfiles_dir_path.is_dir() { - let path = runfiles_dir_path.join("_main").join(&exec_name); - debug!("Resolved executable path (derived RUNFILES_DIR): {}", path.display()); - return path; - } else { - warn!("Warning: RUNFILES_DIR not found or invalid, and derived path {} is not a directory.", runfiles_dir_path.display()); - } - } else { - warn!("Warning: Could not determine current executable path."); - } - - let fallback_path = PathBuf::from(format!("{}.exec", job_label)); - warn!("Falling back to direct executable path: {}", fallback_path.display()); - fallback_path -} - fn worker( task_rx: Receiver>, result_tx: Sender, @@ -105,8 +63,9 @@ fn worker( info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key); let start_time = Instant::now(); - let exec_path = resolve_executable_from_runfiles(&task.job.as_ref().unwrap().label); - + let exec_path = env::var("DATABUILD_EXECUTE_BINARY") + .map_err(|e| Error::from(format!("Env var DATABUILD_EXECUTE_BINARY not set: {}", e))); + let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) { Ok(json) => json, Err(e) => { diff --git a/databuild/rules.bzl b/databuild/rules.bzl index 2eaa792..d3836b2 100644 --- a/databuild/rules.bzl +++ b/databuild/rules.bzl @@ -651,21 +651,52 @@ def _databuild_graph_build_impl(ctx): script_content = RUNFILES_PREFIX + """ # Build DATABUILD_CANDIDATE_JOBS dynamically with proper rlocation resolution -%s +{candidate_jobs_script} export DATABUILD_CANDIDATE_JOBS="$CANDIDATE_JOBS_JSON" -export DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/%s)" -export DATABUILD_GRAPH_LABEL="%s" + +# Resolve binary paths with error checking +DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/{lookup_path})" +if [[ -z "$DATABUILD_JOB_LOOKUP_PATH" || ! -f "$DATABUILD_JOB_LOOKUP_PATH" ]]; then + echo "ERROR: Could not find job lookup binary at _main/{lookup_path}" >&2 + exit 1 +fi +export DATABUILD_JOB_LOOKUP_PATH + +DATABUILD_ANALYZE_BINARY="$(rlocation _main/{analyze_path})" +if [[ -z "$DATABUILD_ANALYZE_BINARY" || ! -f "$DATABUILD_ANALYZE_BINARY" ]]; then + echo "ERROR: Could not find analyze binary at _main/{analyze_path}" >&2 + exit 1 +fi +export DATABUILD_ANALYZE_BINARY + +DATABUILD_EXECUTE_BINARY="$(rlocation _main/{execute_path})" +if [[ -z "$DATABUILD_EXECUTE_BINARY" || ! -f "$DATABUILD_EXECUTE_BINARY" ]]; then + echo "ERROR: Could not find execute binary at _main/{execute_path}" >&2 + exit 1 +fi +export DATABUILD_EXECUTE_BINARY + +export DATABUILD_GRAPH_LABEL="{graph_label}" # Generate a single build request ID for the entire CLI operation export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())") -# Run unified DataBuild CLI wrapper -"$(rlocation databuild+/databuild/cli/databuild_cli)" "$@" -""" % ( - candidate_jobs_script, - ctx.attr.lookup.files_to_run.executable.short_path, - ctx.attr.graph_label, +# Run unified DataBuild CLI wrapper using the provided cli_wrapper attribute +CLI_BINARY="$(rlocation _main/{cli_path})" +if [[ -z "$CLI_BINARY" || ! -f "$CLI_BINARY" ]]; then + echo "ERROR: Could not find CLI binary at _main/{cli_path}" >&2 + exit 1 +fi + +"$CLI_BINARY" "$@" +""".format( + candidate_jobs_script = candidate_jobs_script, + lookup_path = ctx.attr.lookup.files_to_run.executable.short_path, + analyze_path = ctx.attr._analyze.files_to_run.executable.short_path, + execute_path = ctx.attr._execute.files_to_run.executable.short_path, + graph_label = ctx.attr.graph_label, + cli_path = ctx.attr.cli_wrapper.files_to_run.executable.short_path, ) ctx.actions.write( @@ -674,58 +705,29 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4()) content = script_content, ) - # Gather the configure and execute executables + # Gather the configure executables configure_executables = [ job[DataBuildJobInfo].configure.files_to_run.executable for job in ctx.attr.jobs ] - # Get the execute targets - these are the .exec files that need to be in runfiles - execute_executables = [] - for job in ctx.attr.jobs: - # The job target itself contains references to both configure and execute - # We need to find the .exec target for each job - job_name = job.label.name - exec_target_name = job_name + ".exec" - - # Find the .exec target in the same package - for attr_name in dir(job): - if attr_name.endswith("_exec") or exec_target_name in attr_name: - exec_target = getattr(job, attr_name, None) - if exec_target and hasattr(exec_target, "files_to_run"): - execute_executables.append(exec_target.files_to_run.executable) - break - - # Also check if we can access exec targets directly from job dependencies - all_job_files = [] - for job in ctx.attr.jobs: - if hasattr(job, "default_runfiles") and job.default_runfiles: - all_job_files.extend(job.default_runfiles.files.to_list()) - + # Create runfiles including the CLI binary, analyze/execute binaries and all dependencies runfiles = ctx.runfiles( - files = [ctx.executable.cli_wrapper, ctx.executable.lookup] + configure_executables + execute_executables + all_job_files, + files = [ctx.executable.cli_wrapper, ctx.executable.lookup, ctx.executable._analyze, ctx.executable._execute] + configure_executables, ).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles, - ) + ctx.attr._analyze.default_runfiles, + ).merge(ctx.attr._execute.default_runfiles).merge(ctx.attr._bash_runfiles.default_runfiles).merge_all([job.default_runfiles for job in ctx.attr.jobs]) - # Merge runfiles from all configure targets and job targets + # Merge runfiles from all configure targets for job in ctx.attr.jobs: configure_target = job[DataBuildJobInfo].configure runfiles = runfiles.merge(configure_target.default_runfiles) - # Also merge the job's own runfiles which should include the .exec target - runfiles = runfiles.merge(job.default_runfiles) - return [ DefaultInfo( executable = script, runfiles = runfiles, ), - DataBuildGraphInfo( - analyze = ctx.attr.cli_wrapper, - exec = ctx.attr.cli_wrapper, - jobs = ctx.attr.jobs, - ), ] _databuild_graph_build = rule( @@ -755,6 +757,16 @@ _databuild_graph_build = rule( default = Label("@bazel_tools//tools/bash/runfiles"), allow_files = True, ), + "_analyze": attr.label( + default = "@databuild//databuild/graph:analyze", + executable = True, + cfg = "target", + ), + "_execute": attr.label( + default = "@databuild//databuild/graph:execute", + executable = True, + cfg = "target", + ), }, executable = True, ) @@ -902,3 +914,186 @@ _databuild_graph_service = rule( }, executable = True, ) + +def databuild_dsl_generator( + name, + graph_file, + graph_attr = "graph", + output_package = None, + deps = [], + visibility = None): + """Creates a DataBuild DSL code generator that can generate BUILD.bazel and job binaries. + + Args: + name: Name of the generator target (typically ends with .generate) + graph_file: Python file containing the DSL graph definition + graph_attr: Name of the graph attribute in the module (default: "graph") + output_package: Target package for generated files (default: current package) + deps: Dependencies needed to load the graph + visibility: Visibility specification + """ + if not output_package: + output_package = "//" + native.package_name() + + _databuild_dsl_generator_rule( + name = name, + graph_file = graph_file, + graph_attr = graph_attr, + output_package = output_package, + deps = deps, + visibility = visibility, + ) + +def _databuild_dsl_generator_impl(ctx): + """Implementation of the DSL generator rule.""" + script = ctx.actions.declare_file(ctx.label.name + "_generator.py") + + # Get the module path from the graph file + graph_file_path = ctx.file.graph_file.short_path + if graph_file_path.endswith(".py"): + graph_file_path = graph_file_path[:-3] + module_path = graph_file_path.replace("/", ".") + + # Get the package path for output + package_path = ctx.attr.output_package.strip("//").replace(":", "/") + + script_content = """#!/usr/bin/env python3 +import os +import sys + +# Determine output directory +workspace_root = os.environ.get('BUILD_WORKSPACE_DIRECTORY') +if workspace_root: + # Running with bazel run - write to source tree + output_dir = os.path.join(workspace_root, '{package_path}') +else: + # Running with bazel build - write to current directory (bazel-bin) + output_dir = '.' + +print(f"Generating DataBuild DSL code to {{output_dir}}") + +try: + # Setup runfiles for module resolution + try: + from python.runfiles import runfiles + r = runfiles.Create() + if r: + generator_path = r.Rlocation("databuild/databuild/dsl/python/generator") + else: + raise ImportError("runfiles not available") + except ImportError: + # Fallback - assume generator is in the same directory as this script + import subprocess + import shutil + + # Try to find the generator binary using bazel + generator_path = shutil.which("bazel") + if generator_path: + import subprocess + result = subprocess.run([ + "bazel", "run", "//databuild/dsl/python:generator", "--", + "{module_path}", "{graph_attr}", output_dir + ], cwd=workspace_root if workspace_root else ".", capture_output=True, text=True) + + if result.returncode != 0: + print(f"ERROR: Generation failed:", file=sys.stderr) + print(result.stderr, file=sys.stderr) + sys.exit(1) + + print(result.stdout) + sys.exit(0) + else: + print("ERROR: Could not find bazel to run generator", file=sys.stderr) + sys.exit(1) + + # Run the generator with proper module resolution + import subprocess + result = subprocess.run([ + generator_path, "{module_path}", "{graph_attr}", output_dir + ], capture_output=True, text=True) + + if result.returncode != 0: + print(f"ERROR: Generation failed:", file=sys.stderr) + print(result.stderr, file=sys.stderr) + sys.exit(1) + + print(result.stdout) + +except Exception as e: + print(f"ERROR: Generation failed: {{e}}", file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) +""".format( + package_path = package_path, + module_path = module_path, + graph_attr = ctx.attr.graph_attr, + name = ctx.attr.name.replace(".generate", ""), + ) + + ctx.actions.write( + output = script, + content = script_content, + is_executable = True, + ) + + # Create runfiles with all dependencies + runfiles = ctx.runfiles( + files = [ctx.file.graph_file, ctx.executable._generator], + ) + + # Merge runfiles from all dependencies + for dep in ctx.attr.deps: + if hasattr(dep, "default_runfiles"): + runfiles = runfiles.merge(dep.default_runfiles) + + # Add protobuf dependencies + if hasattr(ctx.attr._py_proto, "default_runfiles"): + runfiles = runfiles.merge(ctx.attr._py_proto.default_runfiles) + + # Add generator runfiles + runfiles = runfiles.merge(ctx.attr._generator.default_runfiles) + + # Also add Python runfiles for proper module resolution + runfiles = runfiles.merge(ctx.attr._python_runfiles.default_runfiles) + + return [DefaultInfo( + executable = script, + runfiles = runfiles, + )] + +_databuild_dsl_generator_rule = rule( + implementation = _databuild_dsl_generator_impl, + attrs = { + "graph_file": attr.label( + doc = "Python file containing the DSL graph definition", + allow_single_file = [".py"], + mandatory = True, + ), + "graph_attr": attr.string( + doc = "Name of the graph attribute in the module", + default = "graph", + ), + "output_package": attr.string( + doc = "Target package for generated files", + mandatory = True, + ), + "deps": attr.label_list( + doc = "Dependencies needed to load the graph", + allow_empty = True, + ), + "_python_runfiles": attr.label( + default = "@rules_python//python/runfiles", + allow_files = True, + ), + "_py_proto": attr.label( + default = "//databuild:py_proto", + ), + "_generator": attr.label( + default = "//databuild/dsl/python:generator", + executable = True, + cfg = "target", + ), + }, + executable = True, +) diff --git a/databuild/test/app/dsl/BUILD.bazel b/databuild/test/app/dsl/BUILD.bazel index 61a3a09..10ab6b6 100644 --- a/databuild/test/app/dsl/BUILD.bazel +++ b/databuild/test/app/dsl/BUILD.bazel @@ -1,3 +1,5 @@ +load("@databuild//databuild:rules.bzl", "databuild_dsl_generator") + py_library( name = "dsl_src", srcs = glob( @@ -11,3 +13,12 @@ py_library( "//databuild/test/app:job_src", ], ) + +databuild_dsl_generator( + name = "graph.generate", + graph_file = "graph.py", + graph_attr = "graph", + output_package = "//databuild/test/app/dsl", + deps = [":dsl_src"], + visibility = ["//visibility:public"], +) diff --git a/databuild/test/app/dsl/dsl_job_lookup.py b/databuild/test/app/dsl/dsl_job_lookup.py new file mode 100755 index 0000000..375a425 --- /dev/null +++ b/databuild/test/app/dsl/dsl_job_lookup.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +""" +Generated job lookup for DataBuild DSL graph. +Maps partition patterns to job targets. +""" + +import sys +import re + + +# Mapping from partition patterns to job targets +JOB_MAPPINGS = { + r"daily_color_votes/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)": ":ingest_color_votes", + r"color_votes_1m/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)": ":trailing_color_votes", + r"color_votes_1w/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)": ":trailing_color_votes", + r"daily_votes/(?P\d{4}-\d{2}-\d{2})": ":aggregate_color_votes", + r"votes_1w/(?P\d{4}-\d{2}-\d{2})": ":aggregate_color_votes", + r"votes_1m/(?P\d{4}-\d{2}-\d{2})": ":aggregate_color_votes", + r"color_vote_report/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)": ":color_vote_report_calc", +} + + +def lookup_job_for_partition(partition_ref: str) -> str: + """Look up which job can build the given partition reference.""" + for pattern, job_target in JOB_MAPPINGS.items(): + if re.match(pattern, partition_ref): + return job_target + + raise ValueError(f"No job found for partition: {partition_ref}") + + +def main(): + if len(sys.argv) != 2: + print("Usage: job_lookup.py ", file=sys.stderr) + sys.exit(1) + + partition_ref = sys.argv[1] + try: + job_target = lookup_job_for_partition(partition_ref) + print(job_target) + except ValueError as e: + print(f"ERROR: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/databuild/test/app/dsl/dsl_job_wrapper.py b/databuild/test/app/dsl/dsl_job_wrapper.py new file mode 100644 index 0000000..31af892 --- /dev/null +++ b/databuild/test/app/dsl/dsl_job_wrapper.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Shared DSL job wrapper that can execute any DataBuildJob defined in a DSL graph. +Configured via environment variables: +- DATABUILD_DSL_GRAPH_MODULE: Python module path containing the graph (e.g., 'databuild.test.app.dsl.graph') +- DATABUILD_JOB_CLASS: Job class name to execute (e.g., 'IngestColorVotes') +""" + +import sys +import json +import os +import importlib +from typing import List, Any +from databuild.proto import JobConfig + + +def parse_outputs_from_args(args: List[str], job_class: Any) -> List[Any]: + """Parse partition output references from command line arguments into partition objects.""" + outputs = [] + for arg in args: + # Find which output type can deserialize this partition reference + for output_type in job_class.output_types: + try: + partition = output_type.deserialize(arg) + outputs.append(partition) + break + except ValueError: + continue + else: + raise ValueError(f"No output type in {job_class.__name__} can deserialize partition ref: {arg}") + + return outputs + + +def main(): + if len(sys.argv) < 2: + print("Usage: dsl_job_wrapper.py [args...]", file=sys.stderr) + sys.exit(1) + + command = sys.argv[1] + + # Read configuration from environment + graph_module_path = os.environ.get('DATABUILD_DSL_GRAPH_MODULE') + job_class_name = os.environ.get('DATABUILD_JOB_CLASS') + + if not graph_module_path: + print("ERROR: DATABUILD_DSL_GRAPH_MODULE environment variable not set", file=sys.stderr) + sys.exit(1) + + if not job_class_name: + print("ERROR: DATABUILD_JOB_CLASS environment variable not set", file=sys.stderr) + sys.exit(1) + + try: + # Import the graph module + module = importlib.import_module(graph_module_path) + graph = getattr(module, 'graph') + + # Get the job class + job_class = getattr(module, job_class_name) + + # Create job instance + job_instance = job_class() + + except (ImportError, AttributeError) as e: + print(f"ERROR: Failed to load job {job_class_name} from {graph_module_path}: {e}", file=sys.stderr) + sys.exit(1) + + if command == "config": + try: + # Parse output partition references from remaining args + output_refs = sys.argv[2:] + if not output_refs: + print("ERROR: No output partition references provided", file=sys.stderr) + sys.exit(1) + + outputs = parse_outputs_from_args(output_refs, job_class) + + # Call job's config method + configs = job_instance.config(outputs) + + # Output each config as JSON (one per line for multiple configs) + for config in configs: + # Convert JobConfig to dict for JSON serialization + config_dict = { + 'outputs': [{'str': ref.str} for ref in config.outputs], + 'inputs': [ + { + 'dep_type_code': dep.dep_type_code, + 'dep_type_name': dep.dep_type_name, + 'partition_ref': {'str': dep.partition_ref.str} + } for dep in config.inputs + ], + 'args': config.args, + 'env': config.env, + } + print(json.dumps(config_dict)) + + except Exception as e: + print(f"ERROR: Config failed: {e}", file=sys.stderr) + sys.exit(1) + + elif command == "exec": + try: + # Read config from stdin + config_json = sys.stdin.read().strip() + if not config_json: + print("ERROR: No config provided on stdin", file=sys.stderr) + sys.exit(1) + + config_dict = json.loads(config_json) + + # Convert dict back to JobConfig + from databuild.proto import PartitionRef, DataDep, DepType + + config = JobConfig( + outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']], + inputs=[ + DataDep( + dep_type_code=dep['dep_type_code'], + dep_type_name=dep['dep_type_name'], + partition_ref=PartitionRef(str=dep['partition_ref']['str']) + ) for dep in config_dict['inputs'] + ], + args=config_dict['args'], + env=config_dict['env'], + ) + + # Call job's exec method + job_instance.exec(config) + + except Exception as e: + print(f"ERROR: Execution failed: {e}", file=sys.stderr) + sys.exit(1) + + else: + print(f"ERROR: Unknown command '{command}'. Use 'config' or 'exec'.", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/databuild/test/app/dsl/graph.py b/databuild/test/app/dsl/graph.py index 1d1fdcf..c88189d 100644 --- a/databuild/test/app/dsl/graph.py +++ b/databuild/test/app/dsl/graph.py @@ -19,7 +19,7 @@ from databuild.test.app.dsl.partitions import ( ) from datetime import date, timedelta -graph = DataBuildGraph("//databuild/test/app:dsl_graph") +graph = DataBuildGraph("//databuild/test/app/dsl:dsl_graph") @graph.job