Fix service binary and static asset serving
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-08-04 11:18:28 -07:00
parent e8f38399fa
commit bef37cd8ab
10 changed files with 107 additions and 103 deletions

View file

@ -8,7 +8,7 @@ use databuild::repositories::{
builds::BuildsRepository
};
use clap::{Arg, Command as ClapCommand, ArgMatches};
use log::info;
use log::{info, error};
use simple_logger::SimpleLogger;
use std::env;
use std::process::{Command, Stdio};
@ -26,8 +26,8 @@ async fn run_analysis(
info!("Running analysis for partitions: {:?}", partitions);
// Get required environment variables
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS")
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS not set".to_string()))?;
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS_CFG not set".to_string()))?;
let job_lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
.map_err(|_| CliError::Environment("DATABUILD_JOB_LOOKUP_PATH not set".to_string()))?;
let graph_label = env::var("DATABUILD_GRAPH_LABEL")
@ -40,7 +40,7 @@ async fn run_analysis(
// Build analyze command
let cmd = Command::new(analyze_path)
.args(partitions)
.env("DATABUILD_CANDIDATE_JOBS", candidate_jobs)
.env("DATABUILD_CANDIDATE_JOBS_CFG", candidate_jobs)
.env("DATABUILD_JOB_LOOKUP_PATH", job_lookup_path)
.env("DATABUILD_GRAPH_LABEL", graph_label)
.env("DATABUILD_MODE", "plan")
@ -78,8 +78,8 @@ async fn run_execution(
.map_err(|e| CliError::Execution(format!("Failed to serialize job graph: {}", e)))?;
// Get required environment variables
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS")
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS not set".to_string()))?;
let candidate_jobs = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
.map_err(|_| CliError::Environment("DATABUILD_CANDIDATE_JOBS_CFG not set".to_string()))?;
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").unwrap_or_else(|_| "stdout".to_string());
// Find execute binary using runfiles
@ -88,7 +88,7 @@ async fn run_execution(
// Build execute command
let mut cmd = Command::new(execute_path)
.env("DATABUILD_CANDIDATE_JOBS", candidate_jobs)
.env("DATABUILD_CANDIDATE_JOBS_CFG", candidate_jobs)
.env("DATABUILD_BUILD_EVENT_LOG", build_event_log_uri)
.env("DATABUILD_BUILD_REQUEST_ID", orchestrator.build_request_id())
.stdin(Stdio::piped())
@ -109,7 +109,8 @@ async fn run_execution(
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(CliError::Execution(format!("Execution failed: {}", stderr)));
error!("Execution failed:\n{}", stderr);
return Err(CliError::Execution("Execution failed".to_string()));
}
// For now, assume success if the command completed without error

View file

@ -65,7 +65,7 @@ class DataBuildJob(Protocol):
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
def exec(self, config: JobConfig) -> None: ...
def exec(self, *args: str) -> None: ...
class DataBuildGraph:
@ -237,7 +237,7 @@ if __name__ == "__main__":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(sys.argv[2:])
job_instance.exec(*sys.argv[2:])
else:
raise Exception(f"Invalid command `{{sys.argv[1]}}`")

View file

@ -103,31 +103,7 @@ def main():
elif command == "exec":
try:
# Read config from stdin
config_json = sys.stdin.read().strip()
if not config_json:
print("ERROR: No config provided on stdin", file=sys.stderr)
sys.exit(1)
config_dict = json.loads(config_json)
# Convert dict back to JobConfig
from databuild.proto import PartitionRef, DataDep, DepType
config = JobConfig(
outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']],
inputs=[
DataDep(
dep_type_code=dep['dep_type_code'],
dep_type_name=dep['dep_type_name'],
partition_ref=PartitionRef(str=dep['partition_ref']['str'])
) for dep in config_dict['inputs']
],
args=config_dict['args'],
env=config_dict['env'],
)
# Call job's exec method
job_instance.exec(config)
job_instance.exec(*sys.argv[2:])
except Exception as e:
print(f"ERROR: Execution failed: {e}", file=sys.stderr)

View file

@ -13,11 +13,11 @@ use databuild::mermaid_utils::generate_mermaid_diagram;
// Configure a job to produce the desired outputs
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?;
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS_CFG")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS_CFG: {}", e))?;
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?;
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS_CFG: {}", e))?;
// Look up the executable path for this job
let exec_path = job_path_map.get(job_label)

View file

@ -63,8 +63,16 @@ fn worker(
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
let start_time = Instant::now();
let exec_path = env::var("DATABUILD_EXECUTE_BINARY")
.map_err(|e| Error::from(format!("Env var DATABUILD_EXECUTE_BINARY not set: {}", e)));
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS_EXEC")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS_EXEC: {}", e)).unwrap();
// Look up the executable path for this job
let job_label = &task.job.as_ref().unwrap().label;
let exec_path = job_path_map.get(job_label)
.ok_or_else(|| format!("Job {} is not a candidate job", job_label)).unwrap();
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
Ok(json) => json,
@ -88,7 +96,8 @@ fn worker(
// Generate a job run ID for this execution
let job_run_id = Uuid::new_v4().to_string();
info!("Running job {} (Path: {}) with config: {}", job_label, exec_path, config_json);
let mut cmd = Command::new(&exec_path);
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())

View file

@ -127,9 +127,10 @@ def _databuild_job_exec_impl(ctx):
# Create a simple script that calls the job wrapper with the original binary
script_content = RUNFILES_PREFIX + """
export DATABUILD_JOB_BINARY="$(rlocation _main/{execute_path})"
exec "$(rlocation databuild+/databuild/job/job_wrapper)" exec "$@"
"$(rlocation _main/{wrapper_path})" exec $@
""".format(
execute_path = ctx.attr.execute.files_to_run.executable.short_path,
wrapper_path = ctx.attr._job_wrapper.files_to_run.executable.short_path,
)
ctx.actions.write(
@ -388,7 +389,7 @@ def _databuild_graph_analyze_impl(ctx):
]) + "'"
env_setup = """
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_job_env_var}"
export DATABUILD_MODE=plan
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
""".format(
@ -479,7 +480,7 @@ def _databuild_graph_mermaid_impl(ctx):
]) + "'"
env_setup = """
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_job_env_var}"
export DATABUILD_MODE=mermaid
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
""".format(
@ -633,27 +634,45 @@ def _databuild_graph_build_impl(ctx):
"""Wraps the DataBuild CLI wrapper in a shell script."""
script = ctx.actions.declare_file(ctx.label.name)
# Build DATABUILD_CANDIDATE_JOBS JSON string with runtime rlocation resolution
candidate_jobs_script_lines = ["CANDIDATE_JOBS_JSON=\"{\""]
# Build DATABUILD_CANDIDATE_JOBS_CFG JSON string with runtime rlocation resolution
candidate_jobs_cfg_script_lines = ["CANDIDATE_JOBS_JSON_CFG=\"{\""]
for i, job in enumerate(ctx.attr.jobs):
job_label = "//" + job.label.package + ":" + job.label.name
configure_path = job[DataBuildJobInfo].configure.files_to_run.executable.short_path
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
candidate_jobs_script_lines.append(
'CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % (
candidate_jobs_cfg_script_lines.append(
'CANDIDATE_JOBS_JSON_CFG="${CANDIDATE_JOBS_JSON_CFG}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % (
job_label,
configure_path,
separator,
),
)
candidate_jobs_script_lines.append('CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}}"')
candidate_jobs_script = "\n".join(candidate_jobs_script_lines)
candidate_jobs_cfg_script_lines.append('CANDIDATE_JOBS_JSON_CFG="${CANDIDATE_JOBS_JSON_CFG}}"')
candidate_jobs_cfg_script = "\n".join(candidate_jobs_cfg_script_lines)
# Build DATABUILD_CANDIDATE_JOBS_EXEC JSON string with runtime rlocation resolution
candidate_jobs_exec_script_lines = ["CANDIDATE_JOBS_JSON_EXEC=\"{\""]
for i, job in enumerate(ctx.attr.jobs):
job_label = "//" + job.label.package + ":" + job.label.name
configure_path = job[DataBuildJobInfo].execute.short_path
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
candidate_jobs_exec_script_lines.append(
'CANDIDATE_JOBS_JSON_EXEC="${CANDIDATE_JOBS_JSON_EXEC}\\"%s\\":\\"$(rlocation _main/%s.exec)\\"%s"' % (
job_label,
configure_path,
separator,
),
)
candidate_jobs_exec_script_lines.append('CANDIDATE_JOBS_JSON_EXEC="${CANDIDATE_JOBS_JSON_EXEC}}"')
candidate_jobs_exec_script = "\n".join(candidate_jobs_exec_script_lines)
script_content = RUNFILES_PREFIX + """
# Build DATABUILD_CANDIDATE_JOBS dynamically with proper rlocation resolution
{candidate_jobs_script}
# Build DATABUILD_CANDIDATE_JOBS_CFG dynamically with proper rlocation resolution
{candidate_jobs_cfg_script}
{candidate_jobs_exec_script}
export DATABUILD_CANDIDATE_JOBS="$CANDIDATE_JOBS_JSON"
export DATABUILD_CANDIDATE_JOBS_CFG="$CANDIDATE_JOBS_JSON_CFG"
export DATABUILD_CANDIDATE_JOBS_EXEC="$CANDIDATE_JOBS_JSON_EXEC"
# Resolve binary paths with error checking
DATABUILD_JOB_LOOKUP_PATH="$(rlocation _main/{lookup_path})"
@ -691,7 +710,8 @@ fi
"$CLI_BINARY" "$@"
""".format(
candidate_jobs_script = candidate_jobs_script,
candidate_jobs_cfg_script = candidate_jobs_cfg_script,
candidate_jobs_exec_script = candidate_jobs_exec_script,
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
analyze_path = ctx.attr._analyze.files_to_run.executable.short_path,
execute_path = ctx.attr._execute.files_to_run.executable.short_path,
@ -711,9 +731,15 @@ fi
for job in ctx.attr.jobs
]
# Gather the exec executables
exec_executables = [
job[DataBuildJobInfo].execute
for job in ctx.attr.jobs
]
# Create runfiles including the CLI binary, analyze/execute binaries and all dependencies
runfiles = ctx.runfiles(
files = [ctx.executable.cli_wrapper, ctx.executable.lookup, ctx.executable._analyze, ctx.executable._execute] + configure_executables,
files = [ctx.executable.cli_wrapper, ctx.executable.lookup, ctx.executable._analyze, ctx.executable._execute] + configure_executables + exec_executables,
).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge(
ctx.attr._analyze.default_runfiles,
).merge(ctx.attr._execute.default_runfiles).merge(ctx.attr._bash_runfiles.default_runfiles).merge_all([job.default_runfiles for job in ctx.attr.jobs])
@ -775,7 +801,7 @@ def _databuild_graph_service_impl(ctx):
"""Implementation of the service target that runs the Build Graph Service."""
script = ctx.actions.declare_file(ctx.label.name)
# Build job configurations mapping for DATABUILD_CANDIDATE_JOBS
# Build job configurations mapping for DATABUILD_CANDIDATE_JOBS_CFG
config_paths = {
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
for job in ctx.attr.jobs
@ -787,21 +813,21 @@ def _databuild_graph_service_impl(ctx):
default_db = "sqlite:///tmp/%s.db" % ctx.label.name.replace(".", "_")
env_setup = """
export DATABUILD_CANDIDATE_JOBS="{candidate_jobs}"
export DATABUILD_CANDIDATE_JOBS_CFG="{candidate_jobs}"
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
export DATABUILD_ANALYZE_BINARY=$(rlocation _main/{analyze_path})
export DATABUILD_EXECUTE_BINARY=$(rlocation _main/{exec_path})
export DATABUILD_SERVICE_BINARY=$(rlocation _main/{service_path})
""".format(
candidate_jobs = config_paths_str,
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
analyze_path = ctx.attr.analyze.files_to_run.executable.short_path,
exec_path = ctx.attr.exec.files_to_run.executable.short_path,
service_path = ctx.attr._service.files_to_run.executable.short_path,
)
# Generate a custom script instead of using the template to handle the external binary correctly
script_content = RUNFILES_PREFIX + env_setup + """
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/build_graph_service")"
# Always pass graph-specific configuration, allow user args to override defaults like port/host
# Graph-specific args that should always be set:
GRAPH_ARGS=(
@ -822,9 +848,9 @@ fi
# Run the service with graph-specific args + user args
if [[ -n "${{EXECUTABLE_SUBCOMMAND:-}}" ]]; then
exec "${{EXECUTABLE_BINARY}}" "${{EXECUTABLE_SUBCOMMAND}}" "${{GRAPH_ARGS[@]}}" "$@"
exec "${{DATABUILD_SERVICE_BINARY}}" "${{EXECUTABLE_SUBCOMMAND}}" "${{GRAPH_ARGS[@]}}" "$@"
else
exec "${{EXECUTABLE_BINARY}}" "${{GRAPH_ARGS[@]}}" "$@"
exec "${{DATABUILD_SERVICE_BINARY}}" "${{GRAPH_ARGS[@]}}" "$@"
fi
""".format(
graph_label = ctx.attr.graph_label,

View file

@ -305,15 +305,27 @@ impl BuildGraphService {
.layer(Extension(api))
.layer(axum::middleware::from_fn(Self::cors_middleware))
}
pub async fn openapi_spec(Extension(api): Extension<OpenApi>) -> Json<OpenApi> {
Json(api)
}
fn resolve_fpath(fpath: &str) -> String {
let standard_prefix = "databuild+";
let test_prefix = "_main";
match (
std::fs::read_dir(Self::get_runfile_path(&format!("{}/databuild/dashboard", standard_prefix))),
std::fs::read_dir(Self::get_runfile_path(&format!("{}/databuild/dashboard", test_prefix))),
) {
(Ok(_), _) => Self::get_runfile_path(&format!("{}/databuild/dashboard/{}", standard_prefix, fpath)),
(Err(_), Ok(_)) => Self::get_runfile_path(&format!("{}/databuild/dashboard/{}", test_prefix, fpath)),
(_, Err(_)) => panic!("Failed to find dashboard files"),
}
}
pub async fn serve_index() -> Response {
let index_path = Self::get_runfile_path("databuild+/databuild/dashboard/index.html");
match std::fs::read_to_string(&index_path) {
match std::fs::read_to_string(&Self::resolve_fpath("index.html")) {
Ok(content) => Response::builder()
.header("content-type", "text/html")
.body(content.into())
@ -326,9 +338,7 @@ impl BuildGraphService {
}
pub async fn serve_static(axum::extract::Path(file): axum::extract::Path<String>) -> Response {
let file_path = Self::get_runfile_path(&format!("databuild+/databuild/dashboard/{}", file));
match std::fs::read(file_path) {
match std::fs::read(&Self::resolve_fpath(&file)) {
Ok(content) => {
let content_type = match file.split('.').last() {
Some("html") => "text/html",
@ -352,6 +362,11 @@ impl BuildGraphService {
.unwrap(),
}
}
fn get_dashboard_file_path(relative_path: &str) -> String {
let runfiles_dir = std::env::var("DASHBOARD_FILES_DIR").unwrap();
format!("{}/{}", runfiles_dir, relative_path)
}
fn get_runfile_path(relative_path: &str) -> String {
if let Ok(runfiles_dir) = std::env::var("RUNFILES_DIR") {

View file

@ -6,7 +6,7 @@ from pathlib import Path
def ref_path(ref: PartitionRef) -> str:
assert isinstance(ref, PartitionRef), f"Wanted PartitionRef, got `{type(ref)}`"
return "data/" + ref.str.lstrip("/") + "/data.json"
return "/tmp/data/" + ref.str.lstrip("/") + "/data.json"
def read(*refs: PartitionRef, empty_ok: bool=True) -> list[dict]:

View file

@ -102,32 +102,8 @@ def main():
elif command == "exec":
try:
# Read config from stdin
config_json = sys.stdin.read().strip()
if not config_json:
print("ERROR: No config provided on stdin", file=sys.stderr)
sys.exit(1)
config_dict = json.loads(config_json)
# Convert dict back to JobConfig
from databuild.proto import PartitionRef, DataDep, DepType
config = JobConfig(
outputs=[PartitionRef(str=ref['str']) for ref in config_dict['outputs']],
inputs=[
DataDep(
dep_type_code=dep['dep_type_code'],
dep_type_name=dep['dep_type_name'],
partition_ref=PartitionRef(str=dep['partition_ref']['str'])
) for dep in config_dict['inputs']
],
args=config_dict['args'],
env=config_dict['env'],
)
# Call job's exec method
job_instance.exec(config)
job_instance.exec(*sys.argv[2:])
except Exception as e:
print(f"ERROR: Execution failed: {e}", file=sys.stderr)

View file

@ -17,6 +17,7 @@ from databuild.test.app.dsl.partitions import (
Votes1MPartition,
ColorVoteReportPartition
)
import os
from datetime import date, timedelta
graph = DataBuildGraph("//databuild/test/app/dsl:dsl_graph")
@ -33,8 +34,8 @@ class IngestColorVotes(DataBuildJob):
configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build())
return configs
def exec(self, config: JobConfig) -> None:
ingest_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
def exec(self, *args: str) -> None:
ingest_color_votes_exec(data_date=os.environ["DATA_DATE"], color=os.environ["COLOR"])
@graph.job
@ -68,8 +69,8 @@ class TrailingColorVotes(DataBuildJob):
configs.append(config.build())
return configs
def exec(self, config: JobConfig) -> None:
trailing_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
def exec(self, *args: str) -> None:
trailing_color_votes_exec(data_date=os.environ["DATA_DATE"], color=os.environ["COLOR"])
@graph.job
@ -98,8 +99,8 @@ class AggregateColorVotes(DataBuildJob):
return configs
def exec(self, config: JobConfig) -> None:
aggregate_color_votes_exec(data_date=config.env["DATA_DATE"], aggregate_type=config.env["AGGREGATE_TYPE"])
def exec(self, *args: str) -> None:
aggregate_color_votes_exec(data_date=os.environ["DATA_DATE"], aggregate_type=os.environ["AGGREGATE_TYPE"])
@graph.job
@ -125,6 +126,6 @@ class ColorVoteReportCalc(DataBuildJob):
return [config.build()]
def exec(self, config: JobConfig) -> None:
color_vote_report_calc_exec(config.args)
def exec(self, *args: str) -> None:
color_vote_report_calc_exec(list(args))