Make dsl generation work for submodules
Some checks failed
/ setup (push) Has been cancelled

This commit is contained in:
Stuart Axelbrooke 2025-08-06 22:16:01 -07:00
parent f6e6dad32c
commit ba18734190
9 changed files with 8562 additions and 18 deletions

View file

@ -84,12 +84,13 @@ class DataBuildGraph:
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
raise NotImplementedError
def generate_bazel_package(self, name: str, output_dir: str) -> None:
def generate_bazel_package(self, name: str, output_dir: str, deps: list = None) -> None:
"""Generate BUILD.bazel and binaries into a generated/ subdirectory.
Args:
name: Base name for the generated graph (without .generate suffix)
output_dir: Directory to write generated files to (will create generated/ subdir)
deps: List of Bazel dependency labels to use in generated BUILD.bazel
"""
import os
import shutil
@ -99,7 +100,7 @@ class DataBuildGraph:
os.makedirs(generated_dir, exist_ok=True)
# Generate BUILD.bazel with job and graph targets
self._generate_build_bazel(generated_dir, name)
self._generate_build_bazel(generated_dir, name, deps or [])
# Generate individual job scripts (instead of shared wrapper)
self._generate_job_scripts(generated_dir)
@ -114,15 +115,20 @@ class DataBuildGraph:
else:
print(f"Run 'bazel build generated:{name}_graph.analyze' to use the generated graph")
def _generate_build_bazel(self, output_dir: str, name: str) -> None:
def _generate_build_bazel(self, output_dir: str, name: str, deps: list) -> None:
"""Generate BUILD.bazel with databuild_job and databuild_graph targets."""
import os
# Get job classes from the lookup table
job_classes = list(set(self.lookup.values()))
# Get parent package for dependencies
parent_package = self._get_package_name()
# Format deps for BUILD.bazel
if deps:
deps_str = ", ".join([f'"{dep}"' for dep in deps])
else:
# Fallback to parent package if no deps provided
parent_package = self._get_package_name()
deps_str = f'"//{parent_package}:dsl_src"'
# Generate py_binary targets for each job
job_binaries = []
@ -138,7 +144,7 @@ class DataBuildGraph:
name = "{binary_name}",
srcs = ["{job_script_name}"],
main = "{job_script_name}",
deps = ["//{parent_package}:dsl_src"],
deps = [{deps_str}],
)
databuild_job(
@ -157,7 +163,7 @@ databuild_job(
py_binary(
name = "{name}_job_lookup",
srcs = ["{name}_job_lookup.py"],
deps = ["//{parent_package}:dsl_src"],
deps = [{deps_str}],
)
databuild_graph(

View file

@ -7,7 +7,7 @@ import os
import importlib
def generate_dsl_package(module_path: str, graph_attr: str, output_dir: str):
def generate_dsl_package(module_path: str, graph_attr: str, output_dir: str, deps: list = None):
"""
Generate DataBuild DSL package from a graph definition.
@ -15,6 +15,7 @@ def generate_dsl_package(module_path: str, graph_attr: str, output_dir: str):
module_path: Python module path (e.g., "databuild.test.app.dsl.graph")
graph_attr: Name of the graph attribute in the module
output_dir: Directory where to generate the DSL package
deps: List of Bazel dependency labels to use in generated BUILD.bazel
"""
# Extract the base name from the output directory for naming
name = os.path.basename(output_dir.rstrip('/')) or "graph"
@ -25,7 +26,7 @@ def generate_dsl_package(module_path: str, graph_attr: str, output_dir: str):
graph = getattr(module, graph_attr)
# Generate the bazel package
graph.generate_bazel_package(name, output_dir)
graph.generate_bazel_package(name, output_dir, deps or [])
print(f"Generated DataBuild DSL package in {output_dir}")

View file

@ -970,7 +970,7 @@ def databuild_dsl_generator(
visibility = visibility,
)
def _generate_custom_generator_script(module_path, graph_attr, package_path):
def _generate_custom_generator_script(module_path, graph_attr, package_path, deps):
"""Generate the custom generator script content with embedded parameters."""
return """#!/usr/bin/env python3
import os
@ -981,9 +981,8 @@ import sys
script_path = os.path.abspath(__file__)
runfiles_dir = script_path + '.runfiles'
# Debug: Runfiles path setup
# print(f"DEBUG: Script path: {{script_path}}", file=sys.stderr)
# print(f"DEBUG: Looking for runfiles at: {{runfiles_dir}}", file=sys.stderr)
# Debug: Runfiles path setup for cross-workspace usage
# Setting up runfiles paths for cross-workspace usage
if os.path.exists(runfiles_dir):
# Found runfiles directory, add _main to Python path
@ -991,8 +990,19 @@ if os.path.exists(runfiles_dir):
if os.path.exists(main_runfiles_path):
sys.path.insert(0, main_runfiles_path)
# Successfully added main runfiles path
# Check what other directories exist in runfiles for cross-workspace usage
# All runfiles directories available
else:
print("DEBUG: _main directory not found in runfiles", file=sys.stderr)
# _main directory not found in runfiles
pass
# Add external repository runfiles (like databuild+) for cross-workspace usage
for entry in os.listdir(runfiles_dir):
if entry.endswith('+') and os.path.isdir(os.path.join(runfiles_dir, entry)):
external_path = os.path.join(runfiles_dir, entry)
sys.path.insert(0, external_path)
# Added external repository path
# Also add pip package runfiles to Python path
for entry in os.listdir(runfiles_dir):
@ -1000,9 +1010,9 @@ if os.path.exists(runfiles_dir):
pip_site_packages = os.path.join(runfiles_dir, entry, 'site-packages')
if os.path.exists(pip_site_packages):
sys.path.insert(0, pip_site_packages)
# Successfully added pip package path
# Added pip package path
else:
print("DEBUG: Runfiles directory not found, using workspace root", file=sys.stderr)
# Runfiles directory not found, falling back to workspace root
# If runfiles not available, we're probably running in development
# Add the workspace root to the path
workspace_root = os.environ.get('BUILD_WORKSPACE_DIRECTORY')
@ -1025,7 +1035,7 @@ def main():
print(f"Generating DataBuild DSL code to {{output_dir}}")
try:
generate_dsl_package('{module_path}', '{graph_attr}', output_dir)
generate_dsl_package('{module_path}', '{graph_attr}', output_dir, {deps})
except Exception as e:
print(f"ERROR: {{e}}", file=sys.stderr)
import traceback
@ -1038,6 +1048,7 @@ if __name__ == "__main__":
module_path=module_path,
graph_attr=graph_attr,
package_path=package_path,
deps=deps,
)
def _databuild_dsl_generator_impl(ctx):
@ -1055,10 +1066,14 @@ def _databuild_dsl_generator_impl(ctx):
package_path = ctx.attr.output_package.strip("//").replace(":", "/")
# Generate script content with embedded parameters
# Convert deps to list of strings
dep_labels = [str(dep.label) for dep in ctx.attr.deps] if ctx.attr.deps else []
script_content = _generate_custom_generator_script(
module_path=module_path,
graph_attr=ctx.attr.graph_attr,
package_path=package_path
package_path=package_path,
deps=dep_labels
)
ctx.actions.write(

View file

@ -0,0 +1,22 @@
load("@databuild//databuild:rules.bzl", "databuild_dsl_generator")
# Python DSL library containing the graph definition
py_library(
name = "simple_dsl_src",
srcs = ["simple_graph.py"],
visibility = ["//visibility:public"],
deps = [
"@databuild//databuild:py_proto",
"@databuild//databuild/dsl/python:dsl",
],
)
# DSL generator that creates bazel targets from the Python DSL
databuild_dsl_generator(
name = "simple_graph.generate",
graph_attr = "graph",
graph_file = "simple_graph.py",
output_package = "//",
visibility = ["//visibility:public"],
deps = [":simple_dsl_src"],
)

View file

@ -0,0 +1,26 @@
module(
name = "simple_python_dsl_example",
version = "0.1",
)
# Databuild dep - overridden so ignore version
bazel_dep(name = "databuild", version = "0.0")
local_path_override(
module_name = "databuild",
path = "../..",
)
bazel_dep(name = "rules_python", version = "1.3.0")
python = use_extension("@rules_python//python/extensions:python.bzl", "python")
python.toolchain(
python_version = "3.12",
)
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
pip.parse(
hub_name = "pypi",
python_version = "3.12",
requirements_lock = "//:requirements_lock.txt",
)
use_repo(pip, "pypi")

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,87 @@
# Simple Python DSL Example
This example demonstrates how to use DataBuild's Python DSL to define a simple data processing pipeline.
## Overview
The example defines a basic 3-stage data processing pipeline:
1. **IngestRawData**: Ingests raw data for a specific date
2. **ProcessData**: Processes the raw data into a processed format
3. **CreateSummary**: Creates summary statistics from processed data
## Files
- `simple_graph.py`: Python DSL definition of the data pipeline
- `BUILD.bazel`: Bazel build configuration
- `MODULE.bazel`: Bazel module configuration for dependencies
## Usage
### Generate DSL Targets
The DSL generator can create Bazel targets from the Python DSL definition:
```bash
bazel run //:simple_graph.generate
```
This will generate Bazel targets in the `generated/` directory.
### Build Individual Jobs
```bash
# Build a specific job
bazel build //:ingest_raw_data
# Build all jobs
bazel build //:simple_graph
```
### Analyze the Graph
```bash
# Analyze what jobs would run for specific partitions
bazel run //:simple_graph.analyze -- "summary/date=2024-01-01"
```
### Run the Graph
```bash
# Build specific partitions
bazel run //:simple_graph.build -- "summary/date=2024-01-01"
```
## Cross-Workspace Usage
This example can be consumed from external workspaces by adding DataBuild as a dependency in your `MODULE.bazel`:
```starlark
bazel_dep(name = "databuild", version = "0.0")
local_path_override(
module_name = "databuild",
path = "path/to/databuild",
)
```
Then you can reference and extend this example:
```python
from databuild.dsl.python.dsl import DataBuildGraph
# Import and extend the simple graph
```
## Testing
To test that the DSL generator works correctly:
```bash
# Test the DSL generation
bazel run //:simple_graph.generate
# Verify generated files exist
ls generated/
# Test job lookup
bazel run //:job_lookup -- "raw_data/date=2024-01-01"
```

View file

@ -0,0 +1,2 @@
# Simple Python DSL example - minimal requirements
# This file would normally be generated by pip-compile

View file

@ -0,0 +1,124 @@
"""
Simple Python DSL example - basic data processing pipeline
"""
from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder, PartitionPattern
from databuild.proto import JobConfig
from datetime import date
import os
# Define the graph
graph = DataBuildGraph("//:simple_graph")
class RawDataPartition(PartitionPattern):
"""Represents raw input data for a specific date"""
_raw_pattern = r"raw_data/date=(?P<date>\d{4}-\d{2}-\d{2})"
def __init__(self, date: str):
self.date = date
def serialize(self) -> str:
return f"raw_data/date={self.date}"
class ProcessedDataPartition(PartitionPattern):
"""Represents processed data for a specific date"""
_raw_pattern = r"processed_data/date=(?P<date>\d{4}-\d{2}-\d{2})"
def __init__(self, date: str):
self.date = date
def serialize(self) -> str:
return f"processed_data/date={self.date}"
class SummaryPartition(PartitionPattern):
"""Represents summary data for a specific date"""
_raw_pattern = r"summary/date=(?P<date>\d{4}-\d{2}-\d{2})"
def __init__(self, date: str):
self.date = date
def serialize(self) -> str:
return f"summary/date={self.date}"
@graph.job
class IngestRawData(DataBuildJob):
"""Job to ingest raw data for a given date"""
output_types = [RawDataPartition]
def config(self, outputs: list[RawDataPartition]) -> list[JobConfig]:
configs = []
for output in outputs:
env = {"DATA_DATE": output.date}
configs.append(
JobConfigBuilder()
.add_outputs(output)
.set_env(env)
.add_args("--date", output.date)
.build()
)
return configs
def exec(self, *args: str) -> None:
# Simple implementation - just create a dummy file
data_date = os.environ["DATA_DATE"]
print(f"Ingesting raw data for {data_date}")
# In a real job, this would read from external sources
print(f"Raw data ingested successfully for {data_date}")
@graph.job
class ProcessData(DataBuildJob):
"""Job to process raw data into processed format"""
output_types = [ProcessedDataPartition]
def config(self, outputs: list[ProcessedDataPartition]) -> list[JobConfig]:
configs = []
for output in outputs:
raw_input = RawDataPartition(date=output.date)
env = {"DATA_DATE": output.date}
configs.append(
JobConfigBuilder()
.add_outputs(output)
.add_inputs(raw_input)
.set_env(env)
.add_args("--date", output.date)
.build()
)
return configs
def exec(self, *args: str) -> None:
data_date = os.environ["DATA_DATE"]
print(f"Processing data for {data_date}")
# In a real job, this would transform the raw data
print(f"Data processed successfully for {data_date}")
@graph.job
class CreateSummary(DataBuildJob):
"""Job to create summary from processed data"""
output_types = [SummaryPartition]
def config(self, outputs: list[SummaryPartition]) -> list[JobConfig]:
configs = []
for output in outputs:
processed_input = ProcessedDataPartition(date=output.date)
env = {"DATA_DATE": output.date}
configs.append(
JobConfigBuilder()
.add_outputs(output)
.add_inputs(processed_input)
.set_env(env)
.add_args("--date", output.date)
.build()
)
return configs
def exec(self, *args: str) -> None:
data_date = os.environ["DATA_DATE"]
print(f"Creating summary for {data_date}")
# In a real job, this would generate summary statistics
print(f"Summary created successfully for {data_date}")