phase 1 - fix exec events not being written

This commit is contained in:
Stuart Axelbrooke 2025-07-06 14:58:29 -07:00
parent 410217481e
commit a5ad099951
6 changed files with 1191 additions and 100 deletions

View file

@ -253,7 +253,7 @@ async fn plan(
} }
// Check for partition staleness and delegation opportunities // Check for partition staleness and delegation opportunities
let (stale_refs, delegated_refs) = if let Some(ref event_log) = build_event_log { let (stale_refs, _delegated_refs) = if let Some(ref event_log) = build_event_log {
match check_partition_staleness(output_refs, event_log, build_request_id).await { match check_partition_staleness(output_refs, event_log, build_request_id).await {
Ok((stale, delegated)) => { Ok((stale, delegated)) => {
info!("Staleness check: {} stale, {} delegated partitions", stale.len(), delegated.len()); info!("Staleness check: {} stale, {} delegated partitions", stale.len(), delegated.len());
@ -548,24 +548,10 @@ async fn main() {
let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string()); let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string());
info!("Starting analyze.rs in mode: {}", mode); info!("Starting analyze.rs in mode: {}", mode);
// Parse command line arguments // Parse command line arguments (only for partition references)
let matches = ClapCommand::new("analyze") let matches = ClapCommand::new("analyze")
.version("1.0") .version("1.0")
.about("DataBuild graph analysis tool") .about("DataBuild graph analysis tool")
.arg(
Arg::new("build-event-log")
.long("build-event-log")
.value_name("URI")
.help("Build event log URI (stdout, sqlite://path, postgres://connection)")
.required(false)
)
.arg(
Arg::new("build-request-id")
.long("build-request-id")
.value_name("UUID")
.help("Build request ID (auto-generated if not provided)")
.required(false)
)
.arg( .arg(
Arg::new("partitions") Arg::new("partitions")
.help("Partition references to analyze") .help("Partition references to analyze")
@ -574,19 +560,19 @@ async fn main() {
) )
.get_matches(); .get_matches();
let build_event_log_uri = matches.get_one::<String>("build-event-log");
let build_request_id = matches.get_one::<String>("build-request-id")
.cloned()
.unwrap_or_else(|| Uuid::new_v4().to_string());
let args: Vec<String> = matches.get_many::<String>("partitions") let args: Vec<String> = matches.get_many::<String>("partitions")
.unwrap_or_default() .unwrap_or_default()
.cloned() .cloned()
.collect(); .collect();
// Get build event log configuration from environment variables
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").ok();
let build_request_id = env::var("DATABUILD_BUILD_REQUEST_ID")
.unwrap_or_else(|_| Uuid::new_v4().to_string());
// Initialize build event log if provided // Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri { let build_event_log = if let Some(uri) = build_event_log_uri {
match create_build_event_log(uri).await { match create_build_event_log(&uri).await {
Ok(log) => { Ok(log) => {
info!("Initialized build event log: {}", uri); info!("Initialized build event log: {}", uri);
Some(log) Some(log)

View file

@ -10,7 +10,7 @@ use std::process::{Command, Stdio};
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use clap::{Arg, Command as ClapCommand}; // Command line parsing removed - using environment variables
use uuid::Uuid; use uuid::Uuid;
const NUM_WORKERS: usize = 4; const NUM_WORKERS: usize = 4;
@ -340,34 +340,14 @@ fn log_status_summary(
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
simple_logger::SimpleLogger::new().with_level(log::LevelFilter::Info).init()?; simple_logger::SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
// Parse command line arguments // Get build event log configuration from environment variables
let matches = ClapCommand::new("execute") let build_event_log_uri = std::env::var("DATABUILD_BUILD_EVENT_LOG").ok();
.version("1.0") let build_request_id = std::env::var("DATABUILD_BUILD_REQUEST_ID")
.about("DataBuild graph execution tool") .unwrap_or_else(|_| Uuid::new_v4().to_string());
.arg(
Arg::new("build-event-log")
.long("build-event-log")
.value_name("URI")
.help("Build event log URI (stdout, sqlite://path, postgres://connection)")
.required(false)
)
.arg(
Arg::new("build-request-id")
.long("build-request-id")
.value_name("UUID")
.help("Build request ID (auto-generated if not provided)")
.required(false)
)
.get_matches();
let build_event_log_uri = matches.get_one::<String>("build-event-log");
let build_request_id = matches.get_one::<String>("build-request-id")
.cloned()
.unwrap_or_else(|| Uuid::new_v4().to_string());
// Initialize build event log if provided // Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri { let build_event_log = if let Some(uri) = build_event_log_uri {
match create_build_event_log(uri).await { match create_build_event_log(&uri).await {
Ok(log) => { Ok(log) => {
info!("Initialized build event log: {}", uri); info!("Initialized build event log: {}", uri);
Some(log) Some(log)

View file

@ -9,5 +9,5 @@ set -e
# Assumes workspace name is 'databuild' # Assumes workspace name is 'databuild'
EXECUTABLE_BINARY="$(rlocation "databuild/databuild/graph/analyze")" EXECUTABLE_BINARY="$(rlocation "databuild/databuild/graph/analyze")"
# Run the analysis with optional build event log arguments # Run the analysis
exec "${EXECUTABLE_BINARY}" ${DATABUILD_BUILD_EVENT_LOG_ARGS} "$@" exec "${EXECUTABLE_BINARY}" "$@"

View file

@ -5,7 +5,7 @@ set -e
%{PREFIX} %{PREFIX}
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")")" EXECUTABLE_BINARY="$(rlocation "databuild/databuild/graph/execute")"
# Run the execution with optional build event log arguments # Run the execution
exec "${EXECUTABLE_BINARY}" ${DATABUILD_BUILD_EVENT_LOG_ARGS} "$@" exec "${EXECUTABLE_BINARY}" "$@"

View file

@ -231,15 +231,16 @@ _databuild_job_rule = rule(
executable = True, executable = True,
) )
def databuild_graph(name, jobs, lookup, build_event_log = None, visibility = None): def databuild_graph(name, jobs, lookup, visibility = None):
"""Creates a databuild graph target. """Creates a databuild graph target.
Args: Args:
name: Name of the graph target name: Name of the graph target
jobs: List of job targets jobs: List of job targets
lookup: Job lookup binary lookup: Job lookup binary
build_event_log: Optional build event log URI (e.g., "sqlite:///tmp/builds.db", "stdout")
visibility: Visibility specification visibility: Visibility specification
Note: Build event logging is configured via the DATABUILD_BUILD_EVENT_LOG environment variable.
""" """
_databuild_graph_lookup( _databuild_graph_lookup(
name = "%s.lookup" % name, name = "%s.lookup" % name,
@ -250,7 +251,6 @@ def databuild_graph(name, jobs, lookup, build_event_log = None, visibility = Non
name = "%s.analyze" % name, name = "%s.analyze" % name,
lookup = "%s.lookup" % name, lookup = "%s.lookup" % name,
jobs = jobs, jobs = jobs,
build_event_log = build_event_log,
visibility = visibility, visibility = visibility,
) )
_databuild_graph_mermaid( _databuild_graph_mermaid(
@ -262,7 +262,6 @@ def databuild_graph(name, jobs, lookup, build_event_log = None, visibility = Non
_databuild_graph_exec( _databuild_graph_exec(
name = "%s.exec" % name, name = "%s.exec" % name,
jobs = jobs, jobs = jobs,
build_event_log = build_event_log,
visibility = visibility, visibility = visibility,
) )
_databuild_graph_build( _databuild_graph_build(
@ -270,7 +269,6 @@ def databuild_graph(name, jobs, lookup, build_event_log = None, visibility = Non
analyze = "%s.analyze" % name, analyze = "%s.analyze" % name,
exec = "%s.exec" % name, exec = "%s.exec" % name,
jobs = jobs, jobs = jobs,
build_event_log = build_event_log,
visibility = visibility, visibility = visibility,
) )
tar( tar(
@ -357,20 +355,13 @@ def _databuild_graph_analyze_impl(ctx):
for target in ctx.attr.jobs for target in ctx.attr.jobs
]) + "'" ]) + "'"
# Build the command with optional build event log
build_event_log_args = ""
if ctx.attr.build_event_log:
build_event_log_args = "--build-event-log '%s'" % ctx.attr.build_event_log
env_setup = """ env_setup = """
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}" export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
export DATABUILD_MODE=plan export DATABUILD_MODE=plan
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
export DATABUILD_BUILD_EVENT_LOG_ARGS="{build_event_log_args}"
""".format( """.format(
candidate_job_env_var = config_paths_str, candidate_job_env_var = config_paths_str,
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path, lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
build_event_log_args = build_event_log_args,
) )
script_prefix = env_setup script_prefix = env_setup
@ -424,10 +415,6 @@ _databuild_graph_analyze = rule(
doc = "The list of jobs that are candidates for building partitions in this databuild graph", doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False, allow_empty = False,
), ),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
"_template": attr.label( "_template": attr.label(
default = "@databuild//databuild/graph:rust_analyze_wrapper.sh.tpl", default = "@databuild//databuild/graph:rust_analyze_wrapper.sh.tpl",
allow_single_file = True, allow_single_file = True,
@ -545,16 +532,8 @@ def _databuild_graph_exec_impl(ctx):
for job in ctx.attr.jobs for job in ctx.attr.jobs
] ]
# Build the command with optional build event log
build_event_log_args = ""
if ctx.attr.build_event_log:
build_event_log_args = "--build-event-log '%s'" % ctx.attr.build_event_log
prefix_setup = """ prefix_setup = """
export DATABUILD_BUILD_EVENT_LOG_ARGS="{build_event_log_args}" """
""".format(
build_event_log_args = build_event_log_args,
)
ctx.actions.expand_template( ctx.actions.expand_template(
template = ctx.file._template, template = ctx.file._template,
@ -593,10 +572,6 @@ _databuild_graph_exec = rule(
doc = "The list of jobs that are candidates for building partitions in this databuild graph", doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False, allow_empty = False,
), ),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
"_template": attr.label( "_template": attr.label(
default = "@databuild//databuild/graph:rust_execute_wrapper.sh.tpl", default = "@databuild//databuild/graph:rust_execute_wrapper.sh.tpl",
allow_single_file = True, allow_single_file = True,
@ -673,10 +648,6 @@ _databuild_graph_build = rule(
doc = "The list of jobs that are candidates for building partitions in this databuild graph", doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False, allow_empty = False,
), ),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
}, },
executable = True, executable = True,
) )

File diff suppressed because one or more lines are too long