From eb26bd0274cb99d708215f881523a3fc7b1c2345 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sun, 27 Jul 2025 00:15:12 -0700 Subject: [PATCH] Implement v0 of the new exec wrapper --- databuild/databuild.proto | 48 +++++ databuild/job/BUILD.bazel | 16 +- databuild/job/execute_wrapper.sh.tpl | 53 ------ databuild/job/src/main.rs | 266 +++++++++++++++++++++++++++ databuild/rules.bzl | 34 ++-- 5 files changed, 343 insertions(+), 74 deletions(-) delete mode 100755 databuild/job/execute_wrapper.sh.tpl create mode 100644 databuild/job/src/main.rs diff --git a/databuild/databuild.proto b/databuild/databuild.proto index a9a44ea..d3adee0 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -276,6 +276,54 @@ message BuildEvent { } } +/////////////////////////////////////////////////////////////////////////////////////////////// +// Job Wrapper Log Protocol +/////////////////////////////////////////////////////////////////////////////////////////////// + +// Structured log entry emitted by job wrapper to stdout +message JobLogEntry { + string timestamp = 1; // Unix timestamp + string job_id = 2; // UUID for this job execution + string partition_ref = 3; // Primary partition being processed + uint64 sequence_number = 4; // Monotonic sequence starting from 1 + + oneof content { + LogMessage log = 5; + MetricPoint metric = 6; + WrapperJobEvent job_event = 7; // Wrapper-specific job events + PartitionManifest manifest = 8; + } +} + +// Log message from job stdout/stderr +message LogMessage { + enum LogLevel { + DEBUG = 0; + INFO = 1; + WARN = 2; + ERROR = 3; + } + LogLevel level = 1; + string message = 2; + map fields = 3; +} + +// Metric point emitted by job +message MetricPoint { + string name = 1; + double value = 2; + map labels = 3; + string unit = 4; +} + +// Job wrapper event (distinct from build event log JobEvent) +message WrapperJobEvent { + string event_type = 1; // "config_validate_success", "task_launch_success", etc + map metadata = 2; + optional string job_status = 3; // JobStatus enum as string + optional int32 exit_code = 4; +} + /////////////////////////////////////////////////////////////////////////////////////////////// // List Operations (Unified CLI/Service Responses) /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/databuild/job/BUILD.bazel b/databuild/job/BUILD.bazel index 489aee6..f518325 100644 --- a/databuild/job/BUILD.bazel +++ b/databuild/job/BUILD.bazel @@ -1,3 +1,13 @@ -exports_files([ - "execute_wrapper.sh.tpl", -]) +load("@rules_rust//rust:defs.bzl", "rust_binary") + +rust_binary( + name = "job_wrapper", + srcs = ["src/main.rs"], + visibility = ["//visibility:public"], + deps = [ + "//databuild", + "@crates//:serde", + "@crates//:serde_json", + "@crates//:uuid", + ], +) diff --git a/databuild/job/execute_wrapper.sh.tpl b/databuild/job/execute_wrapper.sh.tpl deleted file mode 100755 index 8b27fd9..0000000 --- a/databuild/job/execute_wrapper.sh.tpl +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -set -e - -%{RUNFILES_PREFIX} - -%{PREFIX} - -EXECUTE_BINARY="$(rlocation "_main/$(basename "%{EXECUTE_PATH}")")" -JQ="$(rlocation "databuild+/databuild/runtime/$(basename "%{JQ_PATH}")")" - -# First argument should be the path to a config file -CONFIG_FILE=${1:-} - -# Create a temporary file for stdin if needed -if [[ -z "$CONFIG_FILE" ]] || [[ "$CONFIG_FILE" == "-" ]]; then - TMP_CONFIG=$(mktemp) - cat > "$TMP_CONFIG" - CONFIG_FILE="$TMP_CONFIG" - trap 'rm -f "$TMP_CONFIG"' EXIT -fi - -# Use jq to validate the config file -# First check if the file starts with { and ends with } -if [[ $(head -c 1 "$CONFIG_FILE") != "{" ]] || [[ $(tail -c 2 "$CONFIG_FILE" | head -c 1) != "}" ]]; then - echo "The config file must be a non-empty JSON object:" - cat $CONFIG_FILE - exit 1 -fi - -# Then validate that it parses -if ! $JQ 'type == "object"' $CONFIG_FILE > /dev/null 2>&1; then - echo "The config file must be a non-empty JSON object:" - cat $CONFIG_FILE - exit 1 -fi - -# Should be a single JSON object - -# Extract and set environment variables from the config -eval "$("$JQ" -r '.env | to_entries | .[] | "export " + .key + "=\"" + .value + "\""' "$CONFIG_FILE")" - -# Extract arguments from the config -ARGS=() -while IFS= read -r arg; do - ARGS+=("$arg") -done < <("$JQ" -r '.args[]' "$CONFIG_FILE") - -# Run the execution with both environment variables (already set) and arguments -if [[ -n "${EXECUTE_SUBCOMMAND:-}" ]]; then - exec "$EXECUTE_BINARY" "${EXECUTE_SUBCOMMAND}" "${ARGS[@]}" -else - exec "$EXECUTE_BINARY" "${ARGS[@]}" -fi diff --git a/databuild/job/src/main.rs b/databuild/job/src/main.rs new file mode 100644 index 0000000..02fc1ac --- /dev/null +++ b/databuild/job/src/main.rs @@ -0,0 +1,266 @@ +use std::env; +use std::io::{self, Read, Write}; +use std::process::{Command, Stdio}; +use std::time::{SystemTime, UNIX_EPOCH}; +// All serialization handled by protobuf serde derives +use serde_json; +use uuid::Uuid; + +// Import protobuf types from databuild +use databuild::{ + PartitionRef, PartitionManifest, Task, JobLabel, JobConfig, + JobLogEntry, LogMessage, WrapperJobEvent, job_log_entry, log_message +}; + +// All types now come from protobuf - no custom structs needed + +struct JobWrapper { + job_id: String, + sequence_number: u64, + start_time: i64, +} + +impl JobWrapper { + fn new() -> Self { + Self { + job_id: Uuid::new_v4().to_string(), + sequence_number: 0, + start_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + } + } + + fn get_timestamp() -> String { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string() + } + + fn next_sequence(&mut self) -> u64 { + self.sequence_number += 1; + self.sequence_number + } + + fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) { + let entry = JobLogEntry { + timestamp: Self::get_timestamp(), + job_id: self.job_id.clone(), + partition_ref: partition_ref.to_string(), + sequence_number: self.next_sequence(), + content: Some(content), + }; + + println!("{}", serde_json::to_string(&entry).unwrap()); + } + + fn config_mode(&mut self, outputs: Vec) -> Result<(), Box> { + // Parse the partition ref from args (first argument) + let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone(); + + // Following the state diagram: wrapper_validate_config -> emit_config_validate_success + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "config_validate_success".to_string(), + metadata: std::collections::HashMap::new(), + job_status: None, + exit_code: None, + })); + + // For Phase 0, we still need to produce the expected JSON config format + // so the current graph system can parse it. Later phases will change this. + let config = JobConfig { + outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(), + inputs: vec![], + args: outputs.clone(), + env: { + let mut env_map = std::collections::HashMap::new(); + if let Some(partition_ref) = outputs.first() { + env_map.insert("PARTITION_REF".to_string(), partition_ref.clone()); + } + env_map + }, + }; + + // For config mode, we need to output the standard config format to stdout + // The structured logs will come later during exec mode + let configs_wrapper = serde_json::json!({ + "configs": [config] + }); + + println!("{}", serde_json::to_string(&configs_wrapper)?); + + Ok(()) + } + + fn exec_mode(&mut self, job_binary: &str) -> Result<(), Box> { + // Read the job config from stdin + let mut buffer = String::new(); + io::stdin().read_to_string(&mut buffer)?; + + let config: JobConfig = serde_json::from_str(&buffer)?; + let partition_ref = config.outputs.first() + .map(|p| p.str.clone()) + .unwrap_or_else(|| "unknown".to_string()); + + // Following the state diagram: + // 1. wrapper_validate_config -> emit_config_validate_success + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "config_validate_success".to_string(), + job_status: None, + exit_code: None, + metadata: std::collections::HashMap::new(), + })); + + // 2. wrapper_launch_task -> emit_task_launch_success + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_launch_success".to_string(), + job_status: None, + exit_code: None, + metadata: std::collections::HashMap::new(), + })); + + // Execute the original job binary with the exec subcommand + let mut cmd = Command::new(job_binary); + cmd.arg("exec"); + + // Add the args from the config + for arg in &config.args { + cmd.arg(arg); + } + + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Set environment variables from config + for (key, value) in &config.env { + cmd.env(key, value); + } + + let mut child = cmd.spawn()?; + + // Send the config to the job + if let Some(stdin) = child.stdin.as_mut() { + stdin.write_all(buffer.as_bytes())?; + } + + let output = child.wait_with_output()?; + let success = output.status.success(); + let exit_code = output.status.code().unwrap_or(-1); + + // Capture and forward job stdout/stderr as log messages + if !output.stdout.is_empty() { + let stdout_str = String::from_utf8_lossy(&output.stdout); + self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { + level: log_message::LogLevel::Info as i32, + message: stdout_str.to_string(), + fields: std::collections::HashMap::new(), + })); + } + + if !output.stderr.is_empty() { + let stderr_str = String::from_utf8_lossy(&output.stderr); + self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { + level: log_message::LogLevel::Error as i32, + message: stderr_str.to_string(), + fields: std::collections::HashMap::new(), + })); + } + + if success { + // Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(exit_code), + metadata: std::collections::HashMap::new(), + })); + + // Then emit_partition_manifest -> success + let end_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + + self.emit_log(&partition_ref, job_log_entry::Content::Manifest(PartitionManifest { + outputs: config.outputs.clone(), + inputs: vec![], // Phase 0: no input manifests yet + start_time: self.start_time, + end_time, + task: Some(Task { + job: Some(JobLabel { + label: env::var("DATABUILD_JOB_LABEL").unwrap_or_else(|_| "unknown".to_string()), + }), + config: Some(config.clone()), + }), + metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet + })); + } else { + // Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_failed".to_string(), + job_status: Some("JOB_FAILED".to_string()), + exit_code: Some(exit_code), + metadata: std::collections::HashMap::new(), + })); + + // Then emit_job_exec_fail -> fail (don't emit partition manifest on failure) + self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_exec_fail".to_string(), + job_status: Some("JOB_FAILED".to_string()), + exit_code: Some(exit_code), + metadata: { + let mut meta = std::collections::HashMap::new(); + meta.insert("error".to_string(), format!("Job failed with exit code {}", exit_code)); + meta + }, + })); + } + + // Forward the original job's output to stdout for compatibility + io::stdout().write_all(&output.stdout)?; + io::stderr().write_all(&output.stderr)?; + + if !success { + std::process::exit(exit_code); + } + + Ok(()) + } +} + +fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + + if args.len() < 2 { + eprintln!("Usage: job_wrapper [args...]"); + std::process::exit(1); + } + + let mode = &args[1]; + let mut wrapper = JobWrapper::new(); + + match mode.as_str() { + "config" => { + let outputs = args[2..].to_vec(); + wrapper.config_mode(outputs)?; + } + "exec" => { + // For exec mode, we need to know which original job binary to call + // For Phase 0, we'll derive this from environment or make it configurable + let job_binary = env::var("DATABUILD_JOB_BINARY") + .unwrap_or_else(|_| "python3".to_string()); // Default fallback + + wrapper.exec_mode(&job_binary)?; + } + _ => { + eprintln!("Unknown mode: {}", mode); + std::process::exit(1); + } + } + + Ok(()) +} \ No newline at end of file diff --git a/databuild/rules.bzl b/databuild/rules.bzl index 70b9fe8..59dc278 100644 --- a/databuild/rules.bzl +++ b/databuild/rules.bzl @@ -113,29 +113,31 @@ _databuild_job_cfg_rule = rule( def _databuild_job_exec_impl(ctx): execute_file = ctx.executable.execute - jq_file = ctx.executable._jq + wrapper_file = ctx.executable._job_wrapper script = ctx.actions.declare_file(ctx.label.name) # Get the correct runfiles paths - jq_path = ctx.attr._jq.files_to_run.executable.path + wrapper_path = ctx.attr._job_wrapper.files_to_run.executable.path execute_path = ctx.attr.execute.files_to_run.executable.path - ctx.actions.expand_template( - template = ctx.file._template, + # Create a simple script that calls the job wrapper with the original binary + script_content = RUNFILES_PREFIX + """ +export DATABUILD_JOB_BINARY="$(rlocation _main/{execute_path})" +exec "$(rlocation databuild+/databuild/job/job_wrapper)" exec "$@" + """.format( + execute_path = ctx.attr.execute.files_to_run.executable.short_path, + ) + + ctx.actions.write( output = script, - substitutions = { - "%{JQ_PATH}": jq_path, - "%{EXECUTE_PATH}": execute_path, - "%{RUNFILES_PREFIX}": RUNFILES_PREFIX, - "%{PREFIX}": "EXECUTE_SUBCOMMAND=\"exec\"\n", - }, + content = script_content, is_executable = True, ) runfiles = ctx.runfiles( - files = [jq_file, execute_file], - ).merge(ctx.attr.execute.default_runfiles).merge(ctx.attr._jq.default_runfiles).merge( + files = [wrapper_file, execute_file], + ).merge(ctx.attr.execute.default_runfiles).merge(ctx.attr._job_wrapper.default_runfiles).merge( ctx.attr._bash_runfiles.default_runfiles, ) @@ -165,12 +167,8 @@ _databuild_job_exec_rule = rule( executable = True, cfg = "target", ), - "_template": attr.label( - default = "@databuild//databuild/job:execute_wrapper.sh.tpl", - allow_single_file = True, - ), - "_jq": attr.label( - default = "@databuild//databuild/runtime:jq", + "_job_wrapper": attr.label( + default = "@databuild//databuild/job:job_wrapper", executable = True, cfg = "target", ),