Compare commits

...

8 commits

11 changed files with 1725 additions and 373 deletions

View file

@ -90,11 +90,15 @@ def lookup_job_for_partition(partition_ref: str) -> str:
```
### Common Pitfalls
- **Not using protobuf-defined interface**: Where structs and interfaces are defined centrally in [`databuild.proto`](./databuild/databuild.proto), those interfaces should always be used. E.g., in rust depending on them via the prost-generated structs, and in the web app via the OpenAPI-generated typescript interfaces.
- **Empty args**: Jobs with `"args": []` won't execute properly
- **Wrong target refs**: Job lookup must return base targets, not `.cfg` variants
- **Missing partition refs**: All outputs must be addressable via partition references
- **Not adding new generated files to OpenAPI outs**: Bazel hermeticity demands that we specify each output file, so when the OpenAPI code gen would create new files, we need to explicitly add them to the target's outs field.
## Notes / Tips
- Rust dependencies are implemented via rules_rust, so new dependencies should be added in the `MODULE.bazel` file.
## Documentation
We use plans / designs in the [plans](./plans/) directory to anchor most large scale efforts. We create plans that are good bets, though not necessarily exhaustive, then (and this is critical) we update them after the work is completed, or after significant progress towards completion.

View file

@ -131,6 +131,10 @@ crate.spec(
package = "rust-embed",
version = "8.0",
)
crate.spec(
package = "sysinfo",
version = "0.30",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -276,6 +276,54 @@ message BuildEvent {
}
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Job Wrapper Log Protocol
///////////////////////////////////////////////////////////////////////////////////////////////
// Structured log entry emitted by job wrapper to stdout
message JobLogEntry {
string timestamp = 1; // Unix timestamp
string job_id = 2; // UUID for this job execution
string partition_ref = 3; // Primary partition being processed
uint64 sequence_number = 4; // Monotonic sequence starting from 1
oneof content {
LogMessage log = 5;
MetricPoint metric = 6;
WrapperJobEvent job_event = 7; // Wrapper-specific job events
PartitionManifest manifest = 8;
}
}
// Log message from job stdout/stderr
message LogMessage {
enum LogLevel {
DEBUG = 0;
INFO = 1;
WARN = 2;
ERROR = 3;
}
LogLevel level = 1;
string message = 2;
map<string, string> fields = 3;
}
// Metric point emitted by job
message MetricPoint {
string name = 1;
double value = 2;
map<string, string> labels = 3;
string unit = 4;
}
// Job wrapper event (distinct from build event log JobEvent)
message WrapperJobEvent {
string event_type = 1; // "config_validate_success", "task_launch_success", etc
map<string, string> metadata = 2;
optional string job_status = 3; // JobStatus enum as string
optional int32 exit_code = 4;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// List Operations (Unified CLI/Service Responses)
///////////////////////////////////////////////////////////////////////////////////////////////

View file

@ -1,3 +1,27 @@
exports_files([
"execute_wrapper.sh.tpl",
])
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test")
rust_binary(
name = "job_wrapper",
srcs = ["main.rs"],
visibility = ["//visibility:public"],
deps = [
"//databuild",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:uuid",
"@crates//:sysinfo",
],
)
rust_test(
name = "job_wrapper_test",
srcs = ["main.rs"],
deps = [
"//databuild",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:uuid",
"@crates//:sysinfo",
"@crates//:tempfile",
],
)

View file

@ -1,53 +0,0 @@
#!/bin/bash
set -e
%{RUNFILES_PREFIX}
%{PREFIX}
EXECUTE_BINARY="$(rlocation "_main/$(basename "%{EXECUTE_PATH}")")"
JQ="$(rlocation "databuild+/databuild/runtime/$(basename "%{JQ_PATH}")")"
# First argument should be the path to a config file
CONFIG_FILE=${1:-}
# Create a temporary file for stdin if needed
if [[ -z "$CONFIG_FILE" ]] || [[ "$CONFIG_FILE" == "-" ]]; then
TMP_CONFIG=$(mktemp)
cat > "$TMP_CONFIG"
CONFIG_FILE="$TMP_CONFIG"
trap 'rm -f "$TMP_CONFIG"' EXIT
fi
# Use jq to validate the config file
# First check if the file starts with { and ends with }
if [[ $(head -c 1 "$CONFIG_FILE") != "{" ]] || [[ $(tail -c 2 "$CONFIG_FILE" | head -c 1) != "}" ]]; then
echo "The config file must be a non-empty JSON object:"
cat $CONFIG_FILE
exit 1
fi
# Then validate that it parses
if ! $JQ 'type == "object"' $CONFIG_FILE > /dev/null 2>&1; then
echo "The config file must be a non-empty JSON object:"
cat $CONFIG_FILE
exit 1
fi
# Should be a single JSON object
# Extract and set environment variables from the config
eval "$("$JQ" -r '.env | to_entries | .[] | "export " + .key + "=\"" + .value + "\""' "$CONFIG_FILE")"
# Extract arguments from the config
ARGS=()
while IFS= read -r arg; do
ARGS+=("$arg")
done < <("$JQ" -r '.args[]' "$CONFIG_FILE")
# Run the execution with both environment variables (already set) and arguments
if [[ -n "${EXECUTE_SUBCOMMAND:-}" ]]; then
exec "$EXECUTE_BINARY" "${EXECUTE_SUBCOMMAND}" "${ARGS[@]}"
else
exec "$EXECUTE_BINARY" "${ARGS[@]}"
fi

975
databuild/job/main.rs Normal file
View file

@ -0,0 +1,975 @@
use std::env;
use std::io::{self, Read, Write};
use std::process::{Command, Stdio};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// All serialization handled by protobuf serde derives
use serde_json;
use sysinfo::{Pid, ProcessRefreshKind, System};
use uuid::Uuid;
// Import protobuf types from databuild
use databuild::{
job_log_entry, log_message, JobConfig, JobLabel, JobLogEntry, LogMessage, PartitionManifest,
PartitionRef, Task, WrapperJobEvent,
};
// All types now come from protobuf - no custom structs needed
// Configuration constants
const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30_000; // 30 seconds
const DEFAULT_METRICS_INTERVAL_MS: u64 = 100; // 100 milliseconds
const TEST_HEARTBEAT_INTERVAL_MS: u64 = 100; // Fast heartbeats for testing
const TEST_METRICS_INTERVAL_MS: u64 = 50; // Fast metrics for testing
#[derive(Debug)]
struct HeartbeatMessage {
entry: JobLogEntry,
}
fn get_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
trait LogSink {
fn emit(&mut self, entry: JobLogEntry);
}
struct StdoutSink;
impl LogSink for StdoutSink {
fn emit(&mut self, entry: JobLogEntry) {
println!("{}", serde_json::to_string(&entry).unwrap());
}
}
struct JobWrapper<S: LogSink> {
job_id: String,
sequence_number: u64,
start_time: i64,
sink: S,
}
impl JobWrapper<StdoutSink> {
fn new() -> Self {
Self::new_with_sink(StdoutSink)
}
}
impl<S: LogSink> JobWrapper<S> {
fn new_with_sink(sink: S) -> Self {
Self {
job_id: Uuid::new_v4().to_string(),
sequence_number: 0,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
sink,
}
}
fn next_sequence(&mut self) -> u64 {
self.sequence_number += 1;
self.sequence_number
}
fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) {
let entry = JobLogEntry {
timestamp: get_timestamp(),
job_id: self.job_id.clone(),
partition_ref: partition_ref.to_string(),
sequence_number: self.next_sequence(),
content: Some(content),
};
self.sink.emit(entry);
}
fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
// Parse the partition ref from args (first argument)
let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone();
// Following the state diagram: wrapper_validate_config -> emit_config_validate_success
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
metadata: std::collections::HashMap::new(),
job_status: None,
exit_code: None,
}),
);
// For Phase 0, we still need to produce the expected JSON config format
// so the current graph system can parse it. Later phases will change this.
let config = JobConfig {
outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
inputs: vec![],
args: outputs.clone(),
env: {
let mut env_map = std::collections::HashMap::new();
if let Some(partition_ref) = outputs.first() {
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
}
env_map
},
};
// For config mode, we need to output the standard config format to stdout
// The structured logs will come later during exec mode
let configs_wrapper = serde_json::json!({
"configs": [config]
});
println!("{}", serde_json::to_string(&configs_wrapper)?);
Ok(())
}
fn exec_mode(&mut self, job_binary: &str) -> Result<(), Box<dyn std::error::Error>> {
// Read the job config from stdin
let mut buffer = String::new();
io::stdin().read_to_string(&mut buffer)?;
let config: JobConfig = serde_json::from_str(&buffer)?;
self.exec_mode_with_config(job_binary, config)
}
fn exec_mode_with_config(
&mut self,
job_binary: &str,
config: JobConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let partition_ref = config
.outputs
.first()
.map(|p| p.str.clone())
.unwrap_or_else(|| "unknown".to_string());
// Following the state diagram:
// 1. wrapper_validate_config -> emit_config_validate_success
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
job_status: None,
exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// 2. wrapper_launch_task -> emit_task_launch_success
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_launch_success".to_string(),
job_status: None,
exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// Execute the original job binary with the exec subcommand
let mut cmd = Command::new(job_binary);
cmd.arg("exec");
// Add the args from the config
for arg in &config.args {
cmd.arg(arg);
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
// Set environment variables from config
for (key, value) in &config.env {
cmd.env(key, value);
}
let mut child = cmd.spawn()?;
let child_pid = child.id();
// Send the config to the job
if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?;
}
// Start heartbeat thread with channel communication
let heartbeat_job_id = self.job_id.clone();
let heartbeat_partition_ref = partition_ref.clone();
let heartbeat_sequence = Arc::new(Mutex::new(0u64));
let heartbeat_sequence_clone = heartbeat_sequence.clone();
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<HeartbeatMessage>();
let heartbeat_handle = thread::spawn(move || {
let mut system = System::new_all();
let pid = Pid::from(child_pid as usize);
let heartbeat_interval_ms = env::var("DATABUILD_HEARTBEAT_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_HEARTBEAT_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL_MS);
loop {
thread::sleep(Duration::from_millis(heartbeat_interval_ms));
// Refresh process info
system.refresh_processes_specifics(ProcessRefreshKind::new());
// Check if process still exists
if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
let cpu_percent = process.cpu_usage();
// Create heartbeat event with metrics
let mut metadata = std::collections::HashMap::new();
metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb));
metadata.insert(
"cpu_usage_percent".to_string(),
format!("{:.3}", cpu_percent),
);
// Get next sequence number for heartbeat
let seq = {
let mut seq_lock = heartbeat_sequence_clone.lock().unwrap();
*seq_lock += 1;
*seq_lock
};
let heartbeat_event = JobLogEntry {
timestamp: get_timestamp(),
job_id: heartbeat_job_id.clone(),
partition_ref: heartbeat_partition_ref.clone(),
sequence_number: seq,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "heartbeat".to_string(),
job_status: None,
exit_code: None,
metadata,
})),
};
// Send heartbeat through channel instead of printing directly
if heartbeat_tx.send(HeartbeatMessage { entry: heartbeat_event }).is_err() {
// Main thread dropped receiver, exit
break;
}
} else {
// Process no longer exists, exit heartbeat thread
break;
}
}
});
// Track metrics while job is running
let job_start_time = SystemTime::now();
let mut system = System::new();
let pid = Pid::from(child_pid as usize);
// Initial refresh to establish baseline for CPU measurements
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
let mut peak_memory_mb = 0.0f64;
let mut cpu_samples = Vec::new();
let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new();
// Sleep briefly to allow the process to start up before measuring
let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_METRICS_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_METRICS_INTERVAL_MS);
thread::sleep(Duration::from_millis(sample_interval_ms));
// Poll process status and metrics
let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop {
// Check if process has exited
match child.try_wait()? {
Some(status) => {
// Process has exited, collect any remaining output
if let Some(mut stdout) = child.stdout.take() {
stdout.read_to_end(&mut stdout_buffer)?;
}
if let Some(mut stderr) = child.stderr.take() {
stderr.read_to_end(&mut stderr_buffer)?;
}
// Calculate final metrics
let job_duration = job_start_time.elapsed().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Time calculation error: {}", e),
)
})?;
// Calculate CPU time: average CPU percentage * wall-clock time
let total_cpu_ms = if cpu_samples.is_empty() {
0.0
} else {
let avg_cpu_percent =
cpu_samples.iter().sum::<f32>() as f64 / cpu_samples.len() as f64;
(avg_cpu_percent / 100.0) * job_duration.as_millis() as f64
};
// Stop heartbeat thread
drop(heartbeat_handle);
// Process any remaining heartbeat messages
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Update sequence number to account for heartbeats
let heartbeat_count = heartbeat_sequence.lock().unwrap();
self.sequence_number = self.sequence_number.max(*heartbeat_count);
drop(heartbeat_count);
// Create output struct to match original behavior
let output = std::process::Output {
status,
stdout: stdout_buffer,
stderr: stderr_buffer,
};
break (output, peak_memory_mb, total_cpu_ms, job_duration);
}
None => {
// Check for heartbeat messages and emit them
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Process still running, collect metrics
// Refresh CPU info and processes
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
// Sleep to allow CPU measurement interval
thread::sleep(Duration::from_millis(sample_interval_ms));
// Refresh again to get updated CPU usage
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
peak_memory_mb = peak_memory_mb.max(memory_mb);
let cpu_usage = process.cpu_usage();
cpu_samples.push(cpu_usage);
}
}
}
};
let success = output.status.success();
let exit_code = output.status.code().unwrap_or(-1);
// Capture and forward job stdout/stderr as log messages
if !output.stdout.is_empty() {
let stdout_str = String::from_utf8_lossy(&output.stdout);
self.emit_log(
&partition_ref,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: stdout_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
}
if !output.stderr.is_empty() {
let stderr_str = String::from_utf8_lossy(&output.stderr);
self.emit_log(
&partition_ref,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Error as i32,
message: stderr_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
}
// Emit job summary with resource metrics
let mut summary_metadata = std::collections::HashMap::new();
summary_metadata.insert(
"runtime_ms".to_string(),
format!("{:.3}", job_duration.as_millis() as f64),
);
summary_metadata.insert(
"peak_memory_mb".to_string(),
format!("{:.3}", peak_memory_mb),
);
summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms));
summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_summary".to_string(),
job_status: None,
exit_code: Some(exit_code),
metadata: summary_metadata,
}),
);
if success {
// Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_partition_manifest -> success
let end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
self.emit_log(
&partition_ref,
job_log_entry::Content::Manifest(PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![], // Phase 0: no input manifests yet
start_time: self.start_time,
end_time,
task: Some(Task {
job: Some(JobLabel {
label: env::var("DATABUILD_JOB_LABEL")
.unwrap_or_else(|_| "unknown".to_string()),
}),
config: Some(config.clone()),
}),
metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet
}),
);
} else {
// Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_job_exec_fail -> fail (don't emit partition manifest on failure)
self.emit_log(
&partition_ref,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_exec_fail".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(exit_code),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert(
"error".to_string(),
format!("Job failed with exit code {}", exit_code),
);
meta
},
}),
);
}
// Forward the original job's output to stdout for compatibility
io::stdout().write_all(&output.stdout)?;
io::stderr().write_all(&output.stderr)?;
if !success {
std::process::exit(exit_code);
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: job_wrapper <config|exec> [args...]");
std::process::exit(1);
}
let mode = &args[1];
let mut wrapper = JobWrapper::new();
match mode.as_str() {
"config" => {
let outputs = args[2..].to_vec();
wrapper.config_mode(outputs)?;
}
"exec" => {
// For exec mode, we need to know which original job binary to call
// For Phase 0, we'll derive this from environment or make it configurable
let job_binary =
env::var("DATABUILD_JOB_BINARY").unwrap_or_else(|_| "python3".to_string()); // Default fallback
wrapper.exec_mode(&job_binary)?;
}
_ => {
eprintln!("Unknown mode: {}", mode);
std::process::exit(1);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// Test infrastructure
struct TestSink {
entries: Vec<JobLogEntry>,
}
impl TestSink {
fn new() -> Self {
Self {
entries: Vec::new(),
}
}
fn find_event(&self, event_type: &str) -> Option<&JobLogEntry> {
self.entries.iter().find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == event_type
} else {
false
}
})
}
}
impl LogSink for TestSink {
fn emit(&mut self, entry: JobLogEntry) {
self.entries.push(entry);
}
}
// Helper functions for testing
fn generate_test_config(outputs: &[String]) -> JobConfig {
JobConfig {
outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
inputs: vec![],
args: outputs.to_vec(),
env: {
let mut env_map = std::collections::HashMap::new();
if let Some(partition_ref) = outputs.first() {
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
}
env_map
},
}
}
#[test]
fn test_job_log_entry_serialization() {
let entry = JobLogEntry {
timestamp: "1234567890".to_string(),
job_id: "test-id".to_string(),
partition_ref: "test/partition".to_string(),
sequence_number: 1,
content: Some(job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: "test message".to_string(),
fields: std::collections::HashMap::new(),
})),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"timestamp\":\"1234567890\""));
assert!(json.contains("\"sequence_number\":1"));
assert!(json.contains("\"Log\":{")); // Capitalized field name
assert!(json.contains("\"message\":\"test message\""));
}
#[test]
fn test_sequence_number_increment() {
let mut wrapper = JobWrapper::new();
assert_eq!(wrapper.next_sequence(), 1);
assert_eq!(wrapper.next_sequence(), 2);
assert_eq!(wrapper.next_sequence(), 3);
}
#[test]
fn test_config_mode_output_format() {
let outputs = vec!["test/partition".to_string()];
let config = generate_test_config(&outputs);
// Verify it produces expected structure
assert_eq!(config.outputs.len(), 1);
assert_eq!(config.outputs[0].r#str, "test/partition");
assert_eq!(config.args, outputs);
assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"test/partition".to_string())
);
}
#[test]
fn test_multiple_outputs_config() {
let outputs = vec![
"reviews/date=2025-01-01".to_string(),
"reviews/date=2025-01-02".to_string(),
];
let config = generate_test_config(&outputs);
assert_eq!(config.outputs.len(), 2);
assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01");
assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02");
// First output is used as PARTITION_REF
assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"reviews/date=2025-01-01".to_string())
);
}
#[test]
fn test_wrapper_job_event_creation() {
// Test success event
let event = WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_success");
assert_eq!(event.job_status, Some("JOB_COMPLETED".to_string()));
assert_eq!(event.exit_code, Some(0));
// Test failure event
let event = WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(1),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_failed");
assert_eq!(event.job_status, Some("JOB_FAILED".to_string()));
assert_eq!(event.exit_code, Some(1));
}
#[test]
fn test_log_message_levels() {
let info_log = LogMessage {
level: log_message::LogLevel::Info as i32,
message: "info message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(info_log.level, log_message::LogLevel::Info as i32);
let error_log = LogMessage {
level: log_message::LogLevel::Error as i32,
message: "error message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(error_log.level, log_message::LogLevel::Error as i32);
}
#[test]
fn test_partition_manifest_structure() {
let config = generate_test_config(&vec!["test/partition".to_string()]);
let manifest = PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![],
start_time: 1234567890,
end_time: 1234567900,
task: Some(Task {
job: Some(JobLabel {
label: "//test:job".to_string(),
}),
config: Some(config),
}),
metadata: std::collections::HashMap::new(),
};
assert_eq!(manifest.outputs.len(), 1);
assert_eq!(manifest.outputs[0].r#str, "test/partition");
assert_eq!(manifest.end_time - manifest.start_time, 10);
assert!(manifest.task.is_some());
}
#[test]
fn test_timestamp_generation() {
let ts1 = get_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = get_timestamp();
// Timestamps should be parseable as integers
let t1: u64 = ts1.parse().expect("Should be valid timestamp");
let t2: u64 = ts2.parse().expect("Should be valid timestamp");
// Second timestamp should be equal or greater
assert!(t2 >= t1);
}
#[test]
fn test_job_wrapper_initialization() {
let wrapper = JobWrapper::new();
assert_eq!(wrapper.sequence_number, 0);
assert!(!wrapper.job_id.is_empty());
assert!(wrapper.start_time > 0);
}
#[test]
fn test_cpu_metrics_are_captured() {
use std::io::Write;
use tempfile::NamedTempFile;
// Create a CPU-intensive test script
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let script_content = r#"#!/usr/bin/env python3
import sys
import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config":
config = {
"outputs": [{"str": "test/cpu"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/cpu"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# CPU-intensive work that runs longer
start_time = time.time()
total = 0
while time.time() - start_time < 0.5: # Run for at least 500ms
total += sum(range(1_000_000))
print(f"Sum: {total}")
"#;
temp_file
.write_all(script_content.as_bytes())
.expect("Failed to write script");
let script_path = temp_file.path().to_str().unwrap();
// Make script executable
std::fs::set_permissions(
script_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("Failed to set permissions");
// Set up environment for fast sampling and the test script
env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); // Even faster for CPU test
env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper
let sink = TestSink::new();
let mut wrapper = JobWrapper::new_with_sink(sink);
// Create a JobConfig for the test
let config = JobConfig {
outputs: vec![PartitionRef {
r#str: "test/cpu".to_string(),
}],
inputs: vec![],
args: vec![],
env: {
let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/cpu".to_string());
env_map
},
};
// We need to simulate stdin for exec_mode - let's create a test-specific exec method
// that takes the config directly rather than reading from stdin
let result = wrapper.exec_mode_with_config(script_path, config);
// Clean up environment
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Check that exec_mode succeeded
if let Err(e) = &result {
println!("exec_mode failed with error: {}", e);
}
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Find the job_summary event
let summary_event = wrapper
.sink
.find_event("job_summary")
.expect("Should have job_summary event");
if let Some(job_log_entry::Content::JobEvent(event)) = &summary_event.content {
// Verify we have CPU metrics
let cpu_ms_str = event
.metadata
.get("total_cpu_ms")
.expect("Should have total_cpu_ms metric");
let cpu_ms: f64 = cpu_ms_str
.parse()
.expect("CPU metric should be valid float");
// For CPU-intensive work, we should get non-zero CPU time
assert!(
cpu_ms > 0.0,
"Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms",
cpu_ms
);
// Also verify runtime is reasonable
let runtime_ms_str = event
.metadata
.get("runtime_ms")
.expect("Should have runtime_ms metric");
let runtime_ms: f64 = runtime_ms_str
.parse()
.expect("Runtime metric should be valid float");
assert!(runtime_ms > 0.0, "Should have non-zero runtime");
println!(
"CPU test results: {:.3}ms CPU time over {:.3}ms runtime",
cpu_ms, runtime_ms
);
} else {
panic!("job_summary event should contain JobEvent");
}
}
#[test]
fn test_heartbeat_functionality() {
use std::io::Write;
use tempfile::NamedTempFile;
// Create a longer-running test script to trigger heartbeats
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let script_content = r#"#!/usr/bin/env python3
import sys
import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config":
config = {
"outputs": [{"str": "test/heartbeat"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/heartbeat"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# Sleep long enough to trigger at least 2 heartbeats
time.sleep(0.3) # 300ms with 100ms heartbeat interval should give us 2-3 heartbeats
print("Job completed")
"#;
temp_file
.write_all(script_content.as_bytes())
.expect("Failed to write script");
let script_path = temp_file.path().to_str().unwrap();
// Make script executable
std::fs::set_permissions(
script_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("Failed to set permissions");
// Set up environment for fast heartbeats and the test script
env::set_var("DATABUILD_HEARTBEAT_INTERVAL_MS", &TEST_HEARTBEAT_INTERVAL_MS.to_string());
env::set_var("DATABUILD_METRICS_INTERVAL_MS", &TEST_METRICS_INTERVAL_MS.to_string());
env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper
let sink = TestSink::new();
let mut wrapper = JobWrapper::new_with_sink(sink);
// Create a JobConfig for the test
let config = JobConfig {
outputs: vec![PartitionRef {
r#str: "test/heartbeat".to_string(),
}],
inputs: vec![],
args: vec![],
env: {
let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/heartbeat".to_string());
env_map
},
};
// Run the job
let result = wrapper.exec_mode_with_config(script_path, config);
// Clean up environment
env::remove_var("DATABUILD_HEARTBEAT_INTERVAL_MS");
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Check that exec_mode succeeded
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Count heartbeat events
let heartbeat_count = wrapper
.sink
.entries
.iter()
.filter(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.count();
// We should have at least 1 heartbeat event (possibly 2-3 depending on timing)
assert!(
heartbeat_count >= 1,
"Expected at least 1 heartbeat event, but got {}",
heartbeat_count
);
// Verify heartbeat event structure
let heartbeat_event = wrapper
.sink
.entries
.iter()
.find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.expect("Should have at least one heartbeat event");
if let Some(job_log_entry::Content::JobEvent(event)) = &heartbeat_event.content {
// Verify heartbeat contains memory and CPU metrics
assert!(
event.metadata.contains_key("memory_usage_mb"),
"Heartbeat should contain memory_usage_mb"
);
assert!(
event.metadata.contains_key("cpu_usage_percent"),
"Heartbeat should contain cpu_usage_percent"
);
}
}
}

View file

@ -113,29 +113,31 @@ _databuild_job_cfg_rule = rule(
def _databuild_job_exec_impl(ctx):
execute_file = ctx.executable.execute
jq_file = ctx.executable._jq
wrapper_file = ctx.executable._job_wrapper
script = ctx.actions.declare_file(ctx.label.name)
# Get the correct runfiles paths
jq_path = ctx.attr._jq.files_to_run.executable.path
wrapper_path = ctx.attr._job_wrapper.files_to_run.executable.path
execute_path = ctx.attr.execute.files_to_run.executable.path
ctx.actions.expand_template(
template = ctx.file._template,
# Create a simple script that calls the job wrapper with the original binary
script_content = RUNFILES_PREFIX + """
export DATABUILD_JOB_BINARY="$(rlocation _main/{execute_path})"
exec "$(rlocation databuild+/databuild/job/job_wrapper)" exec "$@"
""".format(
execute_path = ctx.attr.execute.files_to_run.executable.short_path,
)
ctx.actions.write(
output = script,
substitutions = {
"%{JQ_PATH}": jq_path,
"%{EXECUTE_PATH}": execute_path,
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
"%{PREFIX}": "EXECUTE_SUBCOMMAND=\"exec\"\n",
},
content = script_content,
is_executable = True,
)
runfiles = ctx.runfiles(
files = [jq_file, execute_file],
).merge(ctx.attr.execute.default_runfiles).merge(ctx.attr._jq.default_runfiles).merge(
files = [wrapper_file, execute_file],
).merge(ctx.attr.execute.default_runfiles).merge(ctx.attr._job_wrapper.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles,
)
@ -165,12 +167,8 @@ _databuild_job_exec_rule = rule(
executable = True,
cfg = "target",
),
"_template": attr.label(
default = "@databuild//databuild/job:execute_wrapper.sh.tpl",
allow_single_file = True,
),
"_jq": attr.label(
default = "@databuild//databuild/runtime:jq",
"_job_wrapper": attr.label(
default = "@databuild//databuild/job:job_wrapper",
executable = True,
cfg = "target",
),

File diff suppressed because one or more lines are too long

View file

@ -1,5 +1,12 @@
# Job Wrapper v2 Plan
## Status
- Phase 0: Minimal Bootstrap [DONE]
- Phase 1: Core Protocol [PARTIAL]
- Phase 2: Platform Support [FUTURE]
- Phase 3: Production Hardening [FUTURE]
- Phase 4: Advanced Features [FUTURE]
## Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
@ -16,10 +23,10 @@ The job wrapper is a critical component that mediates between DataBuild graphs a
## Architecture
### Core Design Principles
1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs
2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms
3. **Zero Network Requirements**: Jobs don't need to connect to any services
4. **Fail-Safe**: Graceful handling of job crashes and fast completions
1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs [DONE]
2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms [PARTIAL - local only]
3. **Zero Network Requirements**: Jobs don't need to connect to any services [DONE]
4. **Fail-Safe**: Graceful handling of job crashes and fast completions [PARTIAL - basic handling only]
### Communication Model
```
@ -30,23 +37,23 @@ Graph: Tails logs and interprets into metrics, events, and manifests
## Structured Log Protocol
### Message Format (Protobuf)
### Message Format (Protobuf) [DONE]
```proto
message JobLogEntry {
string timestamp = 1;
string job_id = 2;
string partition_ref = 3;
uint64 sequence_number = 4; // Monotonic sequence starting from 1
uint64 sequence_number = 4; // Monotonic sequence starting from 1 [DONE]
oneof content {
LogMessage log = 5;
MetricPoint metric = 6;
JobEvent event = 7;
PartitionManifest manifest = 8;
LogMessage log = 5; // [DONE]
MetricPoint metric = 6; // [FUTURE]
JobEvent event = 7; // [DONE - WrapperJobEvent]
PartitionManifest manifest = 8; // [DONE]
}
}
message LogMessage {
message LogMessage { // [DONE]
enum LogLevel {
DEBUG = 0;
INFO = 1;
@ -58,14 +65,14 @@ message LogMessage {
map<string, string> fields = 3;
}
message MetricPoint {
message MetricPoint { // [FUTURE]
string name = 1;
double value = 2;
map<string, string> labels = 3;
string unit = 4;
}
message JobEvent {
message JobEvent { // [DONE - as WrapperJobEvent]
string event_type = 1; // "task_launched", "heartbeat", "task_completed", etc
google.protobuf.Any details = 2;
map<string, string> metadata = 3;
@ -73,31 +80,32 @@ message JobEvent {
```
### Log Stream Lifecycle
1. Wrapper emits `job_config_started` event (sequence #1)
2. Wrapper validates configuration
3. Wrapper emits `task_launched` event (sequence #2)
4. Job executes, wrapper captures stdout/stderr (sequence #3+)
5. Wrapper emits periodic `heartbeat` events (every 30s)
6. Wrapper detects job completion
7. Wrapper emits `PartitionManifest` message (final required message with highest sequence number)
8. Wrapper exits
1. Wrapper emits `config_validate_success` event (sequence #1) [DONE]
2. Wrapper validates configuration [DONE]
3. Wrapper emits `task_launch_success` event (sequence #2) [DONE]
4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE]
5. Wrapper emits periodic `heartbeat` events (every 30s) [FUTURE]
6. Wrapper detects job completion [DONE]
7. Wrapper emits `task_success`/`task_failed` event [DONE]
8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE]
9. Wrapper exits [DONE]
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection.
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection. [DONE - sequence numbers implemented]
## Wrapper Implementation
## Wrapper Implementation [PARTIAL]
### Interfaces
### Interfaces [DONE]
```rust
trait JobWrapper {
// Config mode - accepts PartitionRef objects
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>;
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>; // [DONE]
// Exec mode - accepts serialized JobConfig
fn exec(config: JobConfig) -> Result<()>;
fn exec(config: JobConfig) -> Result<()>; // [DONE]
}
```
### Exit Code Standards
### Exit Code Standards [PARTIAL]
Following POSIX conventions and avoiding collisions with standard exit codes:
@ -106,7 +114,7 @@ Reference:
- https://tldp.org/LDP/abs/html/exitcodes.html
```rust
// Standard POSIX codes we respect:
// Standard POSIX codes we respect: [PARTIAL - basic forwarding only]
// 0 - Success
// 1 - General error
// 2 - Misuse of shell builtin
@ -122,13 +130,13 @@ Reference:
// 77 - Permission denied (EX_NOPERM)
// 78 - Configuration error (EX_CONFIG)
// DataBuild-specific codes (100+ to avoid collisions):
// DataBuild-specific codes (100+ to avoid collisions): [FUTURE]
// 100-109 - User-defined permanent failures
// 110-119 - User-defined transient failures
// 120-129 - User-defined resource failures
// 130+ - Other user-defined codes
enum ExitCodeCategory {
enum ExitCodeCategory { // [FUTURE]
Success, // 0
StandardError, // 1-63 (shell/system)
PosixError, // 64-78 (sysexits.h)
@ -139,29 +147,29 @@ enum ExitCodeCategory {
## Platform-Specific Log Handling
### Local Execution
- Graph spawns wrapper process
- Graph reads from stdout pipe directly
- PartitionManifest indicates completion
### Local Execution [DONE]
- Graph spawns wrapper process [DONE]
- Graph reads from stdout pipe directly [DONE]
- PartitionManifest indicates completion [DONE]
### Docker
### Docker [FUTURE]
- Graph runs `docker run` with wrapper as entrypoint
- Graph uses `docker logs -f` to tail output
- Logs persist after container exit
### Kubernetes
### Kubernetes [FUTURE]
- Job pods use wrapper as container entrypoint
- Graph tails logs via K8s API
- Configure `terminationGracePeriodSeconds` for log retention
### Cloud Run / Lambda
### Cloud Run / Lambda [FUTURE]
- Wrapper logs to platform logging service
- Graph queries logs via platform API
- Natural buffering and persistence
## Observability Features
### Metrics Collection
### Metrics Collection [FUTURE]
For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format:
@ -182,14 +190,14 @@ For metrics, we'll use a simplified StatsD-like format in our structured logs, w
}
```
The graph component will:
The graph component will: [FUTURE]
- Aggregate metrics from job logs
- Expose them in Prometheus format for scraping (when running as a service)
- Store summary metrics in the BEL for historical analysis
For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).
### Heartbeating
### Heartbeating [FUTURE]
Fixed 30-second heartbeat interval (based on Kubernetes best practices):
@ -208,7 +216,7 @@ Fixed 30-second heartbeat interval (based on Kubernetes best practices):
}
```
### Log Bandwidth Limits
### Log Bandwidth Limits [FUTURE]
To prevent log flooding:
- Maximum log rate: 1000 messages/second
@ -218,76 +226,95 @@ To prevent log flooding:
## Testing Strategy
### Unit Tests
### Unit Tests [FUTURE]
- Log parsing and serialization
- Exit code categorization
- Rate limiting behavior
- State machine transitions
### Integration Tests
- Full job execution lifecycle
- Platform-specific log tailing
- Fast job completion handling
- Large log volume handling
### Integration Tests [PARTIAL]
- Full job execution lifecycle [DONE - via e2e tests]
- Platform-specific log tailing [PARTIAL - local only]
- Fast job completion handling [DONE]
- Large log volume handling [FUTURE]
### Platform Tests
- Local process execution
- Docker container runs
- Kubernetes job pods
- Cloud Run invocations
### Platform Tests [PARTIAL]
- Local process execution [DONE]
- Docker container runs [FUTURE]
- Kubernetes job pods [FUTURE]
- Cloud Run invocations [FUTURE]
### Failure Scenario Tests
- Job crashes (SIGSEGV, SIGKILL)
- Wrapper crashes
- Log tailing interruptions
- Platform-specific failures
### Failure Scenario Tests [PARTIAL]
- Job crashes (SIGSEGV, SIGKILL) [DONE - basic exit code forwarding]
- Wrapper crashes [FUTURE]
- Log tailing interruptions [FUTURE]
- Platform-specific failures [FUTURE]
## Implementation Phases
### Phase 0: Minimal Bootstrap
### Phase 0: Minimal Bootstrap [DONE]
Implement the absolute minimum to unblock development and testing:
- Basic wrapper that only handles happy path
- Support for local execution only
- Minimal log parsing in graph
- Integration with existing example jobs
- Basic wrapper that only handles happy path [DONE]
- Support for local execution only [DONE]
- Minimal log parsing in graph [DONE - wrapper emits structured logs]
- Integration with existing example jobs [DONE - e2e tests passing]
This phase delivers a working end-to-end system that can be continuously evolved.
This phase delivers a working end-to-end system that can be continuously evolved. [DONE]
### Phase 1: Core Protocol
- Define protobuf schemas
- Implement structured logger
- Add error handling and exit codes
- Implement heartbeating
- Graph-side log parser improvements
**Completed Implementation Details:**
- Created databuild/job/src/main.rs with config/exec modes [DONE]
- Uses protobuf types from databuild.proto [DONE]
- Emits JobLogEntry with sequence numbers [DONE]
- Follows core-build.md state diagram exactly [DONE]
- Forwards job stdout/stderr as LogMessage entries [DONE]
- Emits PartitionManifest on successful completion [DONE]
- Properly handles job failures with exit codes [DONE]
- Modified Bazel rules to use job_wrapper [DONE]
- All e2e tests passing [DONE]
### Phase 2: Platform Support
- Docker integration
- Kubernetes support
- Cloud platform adapters
- Platform-specific testing
### Phase 1: Core Protocol [PARTIAL]
- Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent]
- Implement structured logger [DONE - JSON serialization to stdout]
- Add error handling and exit codes [PARTIAL - basic forwarding only]
- Implement heartbeating [FUTURE]
- Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume]
- MetricPoint message support [FUTURE]
- Advanced error categorization [FUTURE]
### Phase 3: Production Hardening
- Rate limiting
- Error recovery
- Performance optimization
- Monitoring integration
### Phase 2: Platform Support [FUTURE]
- Docker integration [FUTURE]
- Kubernetes support [FUTURE]
- Cloud platform adapters [FUTURE]
- Platform-specific testing [FUTURE]
### Phase 4: Advanced Features
- In-process config for library jobs
- Custom metrics backends
- Advanced failure analysis
### Phase 3: Production Hardening [FUTURE]
- Rate limiting [FUTURE]
- Error recovery [FUTURE]
- Performance optimization [FUTURE]
- Monitoring integration [FUTURE]
### Phase 4: Advanced Features [FUTURE]
- In-process config for library jobs [FUTURE]
- Custom metrics backends [FUTURE]
- Advanced failure analysis [FUTURE]
## Success Criteria
1. **Zero Network Dependencies**: Jobs run without any network access
2. **Platform Parity**: Identical behavior across all execution platforms
3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec
4. **Complete Observability**: All job state changes captured in logs
5. **Graceful Failures**: No log data loss even in crash scenarios
1. **Zero Network Dependencies**: Jobs run without any network access [DONE]
2. **Platform Parity**: Identical behavior across all execution platforms [PARTIAL - local only]
3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec [DONE - fast execution]
4. **Complete Observability**: All job state changes captured in logs [DONE - core events captured]
5. **Graceful Failures**: No log data loss even in crash scenarios [PARTIAL - basic failure handling]
## Next Steps
1. Implement minimal bootstrap wrapper
2. Test with existing example jobs
3. Iterate on log format based on real usage
4. Gradually add features per implementation phases
1. Implement minimal bootstrap wrapper [DONE]
2. Test with existing example jobs [DONE]
3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation]
4. Gradually add features per implementation phases [IN PROGRESS]
**Immediate Next Steps for Phase 1 Completion:**
- Add heartbeating support [FUTURE]
- Implement MetricPoint logging [FUTURE]
- Add graph-side structured log consumption [FUTURE]
- Enhanced error categorization and exit code mapping [FUTURE]

View file

@ -84,36 +84,12 @@ main() {
# Only clean if we detect Java version mismatches
if bazel info 2>&1 | grep -q "openjdk/23"; then
log_warn "Detected stale Java paths, cleaning Bazel caches..."
(cd "$SCRIPT_DIR/examples/basic_graph" && bazel clean --expunge > /dev/null 2>&1 || true)
(cd "$SCRIPT_DIR/examples/podcast_reviews" && bazel clean --expunge > /dev/null 2>&1 || true)
else
log_info "Java environment looks good, skipping cache clean"
fi
# Test 1: Basic Graph
log_info "=== Basic Graph End-to-End Tests ==="
# Build basic graph targets
build_targets "$SCRIPT_DIR/examples/basic_graph" \
"//:basic_graph.build" \
"//:basic_graph.service"
# Run basic graph simple test
run_test "Basic Graph Simple Test" \
"$TESTS_DIR/simple_test.sh" \
"$SCRIPT_DIR/examples/basic_graph/bazel-bin/basic_graph.build" \
"$SCRIPT_DIR/examples/basic_graph/bazel-bin/basic_graph.service"
# Run delegation test for basic graph
log_info "Running test: Basic Graph Delegation Test"
if ! (cd "$SCRIPT_DIR/examples/basic_graph" && \
"$SCRIPT_DIR/tests/end_to_end/delegation_test.sh" \
"bazel-bin/basic_graph.build"); then
test_fail "Test failed: Basic Graph Delegation Test"
fi
test_pass "Test passed: Basic Graph Delegation Test"
# Test 2: Podcast Reviews
# Test 1: Podcast Reviews
log_info "=== Podcast Reviews End-to-End Tests ==="
# Build podcast reviews targets - fail fast if build fails
@ -153,8 +129,6 @@ main() {
# Summary
log_info "=== Test Summary ==="
test_pass "Basic Graph CLI and Service builds work correctly"
test_pass "Basic Graph delegation prevents duplicate builds"
test_pass "Podcast Reviews CLI build works correctly"
test_pass "Podcast Reviews delegation prevents duplicate builds"
test_pass "Build event logging functions properly"
@ -176,7 +150,6 @@ main() {
cleanup() {
log_info "Cleaning up test processes..."
pkill -f "build_graph_service" 2>/dev/null || true
pkill -f "basic_graph.service" 2>/dev/null || true
pkill -f "podcast_reviews_graph.service" 2>/dev/null || true
}