databuild/databuild/job/main.rs

975 lines
34 KiB
Rust

use std::env;
use std::io::{self, Read, Write};
use std::process::{Command, Stdio};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// All serialization handled by protobuf serde derives
use serde_json;
use sysinfo::{Pid, ProcessRefreshKind, System};
use uuid::Uuid;
// Import protobuf types from databuild
use databuild::{
job_log_entry, log_message, JobConfig, JobLabel, JobLogEntry, LogMessage, PartitionManifest,
PartitionRef, Task, WrapperJobEvent,
};
// All types now come from protobuf - no custom structs needed
// Configuration constants
const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30_000; // 30 seconds
const DEFAULT_METRICS_INTERVAL_MS: u64 = 100; // 100 milliseconds
const TEST_HEARTBEAT_INTERVAL_MS: u64 = 100; // Fast heartbeats for testing
const TEST_METRICS_INTERVAL_MS: u64 = 50; // Fast metrics for testing
#[derive(Debug)]
struct HeartbeatMessage {
entry: JobLogEntry,
}
fn get_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
trait LogSink {
fn emit(&mut self, entry: JobLogEntry);
}
struct StdoutSink;
impl LogSink for StdoutSink {
fn emit(&mut self, entry: JobLogEntry) {
println!("{}", serde_json::to_string(&entry).unwrap());
}
}
struct JobWrapper<S: LogSink> {
job_id: String,
sequence_number: u64,
start_time: i64,
sink: S,
}
impl JobWrapper<StdoutSink> {
fn new() -> Self {
Self::new_with_sink(StdoutSink)
}
}
impl<S: LogSink> JobWrapper<S> {
fn new_with_sink(sink: S) -> Self {
// Use job ID from environment if provided by graph execution, otherwise generate one
let job_id = env::var("DATABUILD_JOB_RUN_ID")
.unwrap_or_else(|_| Uuid::new_v4().to_string());
Self {
job_id,
sequence_number: 0,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
sink,
}
}
fn next_sequence(&mut self) -> u64 {
self.sequence_number += 1;
self.sequence_number
}
fn emit_log(&mut self, outputs: &[PartitionRef], content: job_log_entry::Content) {
let entry = JobLogEntry {
timestamp: get_timestamp(),
job_id: self.job_id.clone(),
outputs: outputs.to_vec(),
sequence_number: self.next_sequence(),
content: Some(content),
};
self.sink.emit(entry);
}
fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
// Convert to PartitionRef objects
let output_refs: Vec<PartitionRef> = outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect();
// Following the state diagram: wrapper_validate_config -> emit_config_validate_success
self.emit_log(
&output_refs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
metadata: std::collections::HashMap::new(),
job_status: None,
exit_code: None,
}),
);
// For Phase 0, we still need to produce the expected JSON config format
// so the current graph system can parse it. Later phases will change this.
let config = JobConfig {
outputs: output_refs.clone(),
inputs: vec![],
args: outputs.clone(),
env: {
let mut env_map = std::collections::HashMap::new();
if let Some(partition_ref) = outputs.first() {
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
}
env_map
},
};
// For config mode, we need to output the standard config format to stdout
// The structured logs will come later during exec mode
let configs_wrapper = serde_json::json!({
"configs": [config]
});
println!("{}", serde_json::to_string(&configs_wrapper)?);
Ok(())
}
fn exec_mode(&mut self, job_binary: &str) -> Result<(), Box<dyn std::error::Error>> {
// Read the job config from stdin
let mut buffer = String::new();
io::stdin().read_to_string(&mut buffer)?;
let config: JobConfig = serde_json::from_str(&buffer)?;
self.exec_mode_with_config(job_binary, config)
}
fn exec_mode_with_config(
&mut self,
job_binary: &str,
config: JobConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let outputs = &config.outputs;
// Following the state diagram:
// 1. wrapper_validate_config -> emit_config_validate_success
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
job_status: None,
exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// 2. wrapper_launch_task -> emit_task_launch_success
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_launch_success".to_string(),
job_status: None,
exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// Execute the original job binary with the exec subcommand
let mut cmd = Command::new(job_binary);
cmd.arg("exec");
// Add the args from the config
for arg in &config.args {
cmd.arg(arg);
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
// Set environment variables from config
for (key, value) in &config.env {
cmd.env(key, value);
}
let mut child = cmd.spawn()?;
let child_pid = child.id();
// Send the config to the job
if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?;
}
// Start heartbeat thread with channel communication
let heartbeat_job_id = self.job_id.clone();
let heartbeat_outputs = outputs.clone();
let heartbeat_sequence = Arc::new(Mutex::new(0u64));
let heartbeat_sequence_clone = heartbeat_sequence.clone();
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<HeartbeatMessage>();
let heartbeat_handle = thread::spawn(move || {
let mut system = System::new_all();
let pid = Pid::from(child_pid as usize);
let heartbeat_interval_ms = env::var("DATABUILD_HEARTBEAT_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_HEARTBEAT_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL_MS);
loop {
thread::sleep(Duration::from_millis(heartbeat_interval_ms));
// Refresh process info
system.refresh_processes_specifics(ProcessRefreshKind::new());
// Check if process still exists
if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
let cpu_percent = process.cpu_usage();
// Create heartbeat event with metrics
let mut metadata = std::collections::HashMap::new();
metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb));
metadata.insert(
"cpu_usage_percent".to_string(),
format!("{:.3}", cpu_percent),
);
// Get next sequence number for heartbeat
let seq = {
let mut seq_lock = heartbeat_sequence_clone.lock().unwrap();
*seq_lock += 1;
*seq_lock
};
let heartbeat_event = JobLogEntry {
timestamp: get_timestamp(),
job_id: heartbeat_job_id.clone(),
outputs: heartbeat_outputs.clone(),
sequence_number: seq,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "heartbeat".to_string(),
job_status: None,
exit_code: None,
metadata,
})),
};
// Send heartbeat through channel instead of printing directly
if heartbeat_tx.send(HeartbeatMessage { entry: heartbeat_event }).is_err() {
// Main thread dropped receiver, exit
break;
}
} else {
// Process no longer exists, exit heartbeat thread
break;
}
}
});
// Track metrics while job is running
let job_start_time = SystemTime::now();
let mut system = System::new();
let pid = Pid::from(child_pid as usize);
// Initial refresh to establish baseline for CPU measurements
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
let mut peak_memory_mb = 0.0f64;
let mut cpu_samples = Vec::new();
let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new();
// Sleep briefly to allow the process to start up before measuring
let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_METRICS_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_METRICS_INTERVAL_MS);
thread::sleep(Duration::from_millis(sample_interval_ms));
// Poll process status and metrics
let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop {
// Check if process has exited
match child.try_wait()? {
Some(status) => {
// Process has exited, collect any remaining output
if let Some(mut stdout) = child.stdout.take() {
stdout.read_to_end(&mut stdout_buffer)?;
}
if let Some(mut stderr) = child.stderr.take() {
stderr.read_to_end(&mut stderr_buffer)?;
}
// Calculate final metrics
let job_duration = job_start_time.elapsed().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Time calculation error: {}", e),
)
})?;
// Calculate CPU time: average CPU percentage * wall-clock time
let total_cpu_ms = if cpu_samples.is_empty() {
0.0
} else {
let avg_cpu_percent =
cpu_samples.iter().sum::<f32>() as f64 / cpu_samples.len() as f64;
(avg_cpu_percent / 100.0) * job_duration.as_millis() as f64
};
// Stop heartbeat thread
drop(heartbeat_handle);
// Process any remaining heartbeat messages
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Update sequence number to account for heartbeats
let heartbeat_count = heartbeat_sequence.lock().unwrap();
self.sequence_number = self.sequence_number.max(*heartbeat_count);
drop(heartbeat_count);
// Create output struct to match original behavior
let output = std::process::Output {
status,
stdout: stdout_buffer,
stderr: stderr_buffer,
};
break (output, peak_memory_mb, total_cpu_ms, job_duration);
}
None => {
// Check for heartbeat messages and emit them
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Process still running, collect metrics
// Refresh CPU info and processes
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
// Sleep to allow CPU measurement interval
thread::sleep(Duration::from_millis(sample_interval_ms));
// Refresh again to get updated CPU usage
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
peak_memory_mb = peak_memory_mb.max(memory_mb);
let cpu_usage = process.cpu_usage();
cpu_samples.push(cpu_usage);
}
}
}
};
let success = output.status.success();
let exit_code = output.status.code().unwrap_or(-1);
// Capture and forward job stdout/stderr as log messages
if !output.stdout.is_empty() {
let stdout_str = String::from_utf8_lossy(&output.stdout);
self.emit_log(
outputs,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: stdout_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
}
if !output.stderr.is_empty() {
let stderr_str = String::from_utf8_lossy(&output.stderr);
self.emit_log(
outputs,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Error as i32,
message: stderr_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
}
// Emit job summary with resource metrics
let mut summary_metadata = std::collections::HashMap::new();
summary_metadata.insert(
"runtime_ms".to_string(),
format!("{:.3}", job_duration.as_millis() as f64),
);
summary_metadata.insert(
"peak_memory_mb".to_string(),
format!("{:.3}", peak_memory_mb),
);
summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms));
summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_summary".to_string(),
job_status: None,
exit_code: Some(exit_code),
metadata: summary_metadata,
}),
);
if success {
// Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_partition_manifest -> success
let end_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
self.emit_log(
outputs,
job_log_entry::Content::Manifest(PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![], // Phase 0: no input manifests yet
start_time: self.start_time,
end_time,
task: Some(Task {
job: Some(JobLabel {
label: env::var("DATABUILD_JOB_LABEL")
.unwrap_or_else(|_| "unknown".to_string()),
}),
config: Some(config.clone()),
}),
metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet
}),
);
} else {
// Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_job_exec_fail -> fail (don't emit partition manifest on failure)
self.emit_log(
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_exec_fail".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(exit_code),
metadata: {
let mut meta = std::collections::HashMap::new();
meta.insert(
"error".to_string(),
format!("Job failed with exit code {}", exit_code),
);
meta
},
}),
);
}
// Forward the original job's output to stdout for compatibility
io::stdout().write_all(&output.stdout)?;
io::stderr().write_all(&output.stderr)?;
if !success {
std::process::exit(exit_code);
}
Ok(())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: job_wrapper <config|exec> [args...]");
std::process::exit(1);
}
let mode = &args[1];
let mut wrapper = JobWrapper::new();
match mode.as_str() {
"config" => {
let outputs = args[2..].to_vec();
wrapper.config_mode(outputs)?;
}
"exec" => {
// For exec mode, we need to know which original job binary to call
// For Phase 0, we'll derive this from environment or make it configurable
let job_binary =
env::var("DATABUILD_JOB_BINARY").unwrap_or_else(|_| "python3".to_string()); // Default fallback
wrapper.exec_mode(&job_binary)?;
}
_ => {
eprintln!("Unknown mode: {}", mode);
std::process::exit(1);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// Test infrastructure
struct TestSink {
entries: Vec<JobLogEntry>,
}
impl TestSink {
fn new() -> Self {
Self {
entries: Vec::new(),
}
}
fn find_event(&self, event_type: &str) -> Option<&JobLogEntry> {
self.entries.iter().find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == event_type
} else {
false
}
})
}
}
impl LogSink for TestSink {
fn emit(&mut self, entry: JobLogEntry) {
self.entries.push(entry);
}
}
// Helper functions for testing
fn generate_test_config(outputs: &[String]) -> JobConfig {
JobConfig {
outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
inputs: vec![],
args: outputs.to_vec(),
env: {
let mut env_map = std::collections::HashMap::new();
if let Some(partition_ref) = outputs.first() {
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
}
env_map
},
}
}
#[test]
fn test_job_log_entry_serialization() {
let entry = JobLogEntry {
timestamp: "1234567890".to_string(),
job_id: "test-id".to_string(),
outputs: vec![PartitionRef { r#str: "test/partition".to_string() }],
sequence_number: 1,
content: Some(job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: "test message".to_string(),
fields: std::collections::HashMap::new(),
})),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"timestamp\":\"1234567890\""));
assert!(json.contains("\"sequence_number\":1"));
assert!(json.contains("\"Log\":{")); // Capitalized field name
assert!(json.contains("\"message\":\"test message\""));
}
#[test]
fn test_sequence_number_increment() {
let mut wrapper = JobWrapper::new();
assert_eq!(wrapper.next_sequence(), 1);
assert_eq!(wrapper.next_sequence(), 2);
assert_eq!(wrapper.next_sequence(), 3);
}
#[test]
fn test_config_mode_output_format() {
let outputs = vec!["test/partition".to_string()];
let config = generate_test_config(&outputs);
// Verify it produces expected structure
assert_eq!(config.outputs.len(), 1);
assert_eq!(config.outputs[0].r#str, "test/partition");
assert_eq!(config.args, outputs);
assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"test/partition".to_string())
);
}
#[test]
fn test_multiple_outputs_config() {
let outputs = vec![
"reviews/date=2025-01-01".to_string(),
"reviews/date=2025-01-02".to_string(),
];
let config = generate_test_config(&outputs);
assert_eq!(config.outputs.len(), 2);
assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01");
assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02");
// First output is used as PARTITION_REF
assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"reviews/date=2025-01-01".to_string())
);
}
#[test]
fn test_wrapper_job_event_creation() {
// Test success event
let event = WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_success");
assert_eq!(event.job_status, Some("JOB_COMPLETED".to_string()));
assert_eq!(event.exit_code, Some(0));
// Test failure event
let event = WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(1),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_failed");
assert_eq!(event.job_status, Some("JOB_FAILED".to_string()));
assert_eq!(event.exit_code, Some(1));
}
#[test]
fn test_log_message_levels() {
let info_log = LogMessage {
level: log_message::LogLevel::Info as i32,
message: "info message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(info_log.level, log_message::LogLevel::Info as i32);
let error_log = LogMessage {
level: log_message::LogLevel::Error as i32,
message: "error message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(error_log.level, log_message::LogLevel::Error as i32);
}
#[test]
fn test_partition_manifest_structure() {
let config = generate_test_config(&vec!["test/partition".to_string()]);
let manifest = PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![],
start_time: 1234567890,
end_time: 1234567900,
task: Some(Task {
job: Some(JobLabel {
label: "//test:job".to_string(),
}),
config: Some(config),
}),
metadata: std::collections::HashMap::new(),
};
assert_eq!(manifest.outputs.len(), 1);
assert_eq!(manifest.outputs[0].r#str, "test/partition");
assert_eq!(manifest.end_time - manifest.start_time, 10);
assert!(manifest.task.is_some());
}
#[test]
fn test_timestamp_generation() {
let ts1 = get_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = get_timestamp();
// Timestamps should be parseable as integers
let t1: u64 = ts1.parse().expect("Should be valid timestamp");
let t2: u64 = ts2.parse().expect("Should be valid timestamp");
// Second timestamp should be equal or greater
assert!(t2 >= t1);
}
#[test]
fn test_job_wrapper_initialization() {
let wrapper = JobWrapper::new();
assert_eq!(wrapper.sequence_number, 0);
assert!(!wrapper.job_id.is_empty());
assert!(wrapper.start_time > 0);
}
#[test]
fn test_cpu_metrics_are_captured() {
use std::io::Write;
use tempfile::NamedTempFile;
// Create a CPU-intensive test script
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let script_content = r#"#!/usr/bin/env python3
import sys
import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config":
config = {
"outputs": [{"str": "test/cpu"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/cpu"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# CPU-intensive work that runs longer
start_time = time.time()
total = 0
while time.time() - start_time < 0.5: # Run for at least 500ms
total += sum(range(1_000_000))
print(f"Sum: {total}")
"#;
temp_file
.write_all(script_content.as_bytes())
.expect("Failed to write script");
let script_path = temp_file.path().to_str().unwrap();
// Make script executable
std::fs::set_permissions(
script_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("Failed to set permissions");
// Set up environment for fast sampling and the test script
env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); // Even faster for CPU test
env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper
let sink = TestSink::new();
let mut wrapper = JobWrapper::new_with_sink(sink);
// Create a JobConfig for the test
let config = JobConfig {
outputs: vec![PartitionRef {
r#str: "test/cpu".to_string(),
}],
inputs: vec![],
args: vec![],
env: {
let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/cpu".to_string());
env_map
},
};
// We need to simulate stdin for exec_mode - let's create a test-specific exec method
// that takes the config directly rather than reading from stdin
let result = wrapper.exec_mode_with_config(script_path, config);
// Clean up environment
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Check that exec_mode succeeded
if let Err(e) = &result {
println!("exec_mode failed with error: {}", e);
}
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Find the job_summary event
let summary_event = wrapper
.sink
.find_event("job_summary")
.expect("Should have job_summary event");
if let Some(job_log_entry::Content::JobEvent(event)) = &summary_event.content {
// Verify we have CPU metrics
let cpu_ms_str = event
.metadata
.get("total_cpu_ms")
.expect("Should have total_cpu_ms metric");
let cpu_ms: f64 = cpu_ms_str
.parse()
.expect("CPU metric should be valid float");
// For CPU-intensive work, we should get non-zero CPU time
assert!(
cpu_ms > 0.0,
"Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms",
cpu_ms
);
// Also verify runtime is reasonable
let runtime_ms_str = event
.metadata
.get("runtime_ms")
.expect("Should have runtime_ms metric");
let runtime_ms: f64 = runtime_ms_str
.parse()
.expect("Runtime metric should be valid float");
assert!(runtime_ms > 0.0, "Should have non-zero runtime");
println!(
"CPU test results: {:.3}ms CPU time over {:.3}ms runtime",
cpu_ms, runtime_ms
);
} else {
panic!("job_summary event should contain JobEvent");
}
}
#[test]
fn test_heartbeat_functionality() {
use std::io::Write;
use tempfile::NamedTempFile;
// Create a longer-running test script to trigger heartbeats
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let script_content = r#"#!/usr/bin/env python3
import sys
import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config":
config = {
"outputs": [{"str": "test/heartbeat"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/heartbeat"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# Sleep long enough to trigger at least 2 heartbeats
time.sleep(0.3) # 300ms with 100ms heartbeat interval should give us 2-3 heartbeats
print("Job completed")
"#;
temp_file
.write_all(script_content.as_bytes())
.expect("Failed to write script");
let script_path = temp_file.path().to_str().unwrap();
// Make script executable
std::fs::set_permissions(
script_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("Failed to set permissions");
// Set up environment for fast heartbeats and the test script
env::set_var("DATABUILD_HEARTBEAT_INTERVAL_MS", &TEST_HEARTBEAT_INTERVAL_MS.to_string());
env::set_var("DATABUILD_METRICS_INTERVAL_MS", &TEST_METRICS_INTERVAL_MS.to_string());
env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper
let sink = TestSink::new();
let mut wrapper = JobWrapper::new_with_sink(sink);
// Create a JobConfig for the test
let config = JobConfig {
outputs: vec![PartitionRef {
r#str: "test/heartbeat".to_string(),
}],
inputs: vec![],
args: vec![],
env: {
let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/heartbeat".to_string());
env_map
},
};
// Run the job
let result = wrapper.exec_mode_with_config(script_path, config);
// Clean up environment
env::remove_var("DATABUILD_HEARTBEAT_INTERVAL_MS");
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Check that exec_mode succeeded
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Count heartbeat events
let heartbeat_count = wrapper
.sink
.entries
.iter()
.filter(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.count();
// We should have at least 1 heartbeat event (possibly 2-3 depending on timing)
assert!(
heartbeat_count >= 1,
"Expected at least 1 heartbeat event, but got {}",
heartbeat_count
);
// Verify heartbeat event structure
let heartbeat_event = wrapper
.sink
.entries
.iter()
.find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.expect("Should have at least one heartbeat event");
if let Some(job_log_entry::Content::JobEvent(event)) = &heartbeat_event.content {
// Verify heartbeat contains memory and CPU metrics
assert!(
event.metadata.contains_key("memory_usage_mb"),
"Heartbeat should contain memory_usage_mb"
);
assert!(
event.metadata.contains_key("cpu_usage_percent"),
"Heartbeat should contain cpu_usage_percent"
);
}
}
}