Add test for asserting measured cpu

This commit is contained in:
Stuart Axelbrooke 2025-07-28 19:18:42 -07:00
parent 41ea8f129c
commit 7fd8b0a0d5
2 changed files with 361 additions and 199 deletions

View file

@ -22,5 +22,6 @@ rust_test(
"@crates//:serde_json", "@crates//:serde_json",
"@crates//:uuid", "@crates//:uuid",
"@crates//:sysinfo", "@crates//:sysinfo",
"@crates//:tempfile",
], ],
) )

View file

@ -2,29 +2,56 @@ use std::env;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH, Duration};
use std::thread; use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// All serialization handled by protobuf serde derives // All serialization handled by protobuf serde derives
use serde_json; use serde_json;
use sysinfo::{Pid, ProcessRefreshKind, System};
use uuid::Uuid; use uuid::Uuid;
use sysinfo::{System, ProcessRefreshKind, Pid};
// Import protobuf types from databuild // Import protobuf types from databuild
use databuild::{ use databuild::{
PartitionRef, PartitionManifest, Task, JobLabel, JobConfig, job_log_entry, log_message, JobConfig, JobLabel, JobLogEntry, LogMessage, PartitionManifest,
JobLogEntry, LogMessage, WrapperJobEvent, job_log_entry, log_message PartitionRef, Task, WrapperJobEvent,
}; };
// All types now come from protobuf - no custom structs needed // All types now come from protobuf - no custom structs needed
struct JobWrapper { fn get_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
trait LogSink {
fn emit(&mut self, entry: JobLogEntry);
}
struct StdoutSink;
impl LogSink for StdoutSink {
fn emit(&mut self, entry: JobLogEntry) {
println!("{}", serde_json::to_string(&entry).unwrap());
}
}
struct JobWrapper<S: LogSink> {
job_id: String, job_id: String,
sequence_number: u64, sequence_number: u64,
start_time: i64, start_time: i64,
sink: S,
} }
impl JobWrapper { impl JobWrapper<StdoutSink> {
fn new() -> Self { fn new() -> Self {
Self::new_with_sink(StdoutSink)
}
}
impl<S: LogSink> JobWrapper<S> {
fn new_with_sink(sink: S) -> Self {
Self { Self {
job_id: Uuid::new_v4().to_string(), job_id: Uuid::new_v4().to_string(),
sequence_number: 0, sequence_number: 0,
@ -32,17 +59,10 @@ impl JobWrapper {
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs() as i64, .as_secs() as i64,
sink,
} }
} }
fn get_timestamp() -> String {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
fn next_sequence(&mut self) -> u64 { fn next_sequence(&mut self) -> u64 {
self.sequence_number += 1; self.sequence_number += 1;
self.sequence_number self.sequence_number
@ -50,32 +70,38 @@ impl JobWrapper {
fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) { fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) {
let entry = JobLogEntry { let entry = JobLogEntry {
timestamp: Self::get_timestamp(), timestamp: get_timestamp(),
job_id: self.job_id.clone(), job_id: self.job_id.clone(),
partition_ref: partition_ref.to_string(), partition_ref: partition_ref.to_string(),
sequence_number: self.next_sequence(), sequence_number: self.next_sequence(),
content: Some(content), content: Some(content),
}; };
println!("{}", serde_json::to_string(&entry).unwrap()); self.sink.emit(entry);
} }
fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> { fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
// Parse the partition ref from args (first argument) // Parse the partition ref from args (first argument)
let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone(); let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone();
// Following the state diagram: wrapper_validate_config -> emit_config_validate_success // Following the state diagram: wrapper_validate_config -> emit_config_validate_success
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "config_validate_success".to_string(), &partition_ref,
metadata: std::collections::HashMap::new(), job_log_entry::Content::JobEvent(WrapperJobEvent {
job_status: None, event_type: "config_validate_success".to_string(),
exit_code: None, metadata: std::collections::HashMap::new(),
})); job_status: None,
exit_code: None,
}),
);
// For Phase 0, we still need to produce the expected JSON config format // For Phase 0, we still need to produce the expected JSON config format
// so the current graph system can parse it. Later phases will change this. // so the current graph system can parse it. Later phases will change this.
let config = JobConfig { let config = JobConfig {
outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(), outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
inputs: vec![], inputs: vec![],
args: outputs.clone(), args: outputs.clone(),
env: { env: {
@ -92,9 +118,9 @@ impl JobWrapper {
let configs_wrapper = serde_json::json!({ let configs_wrapper = serde_json::json!({
"configs": [config] "configs": [config]
}); });
println!("{}", serde_json::to_string(&configs_wrapper)?); println!("{}", serde_json::to_string(&configs_wrapper)?);
Ok(()) Ok(())
} }
@ -102,38 +128,54 @@ impl JobWrapper {
// Read the job config from stdin // Read the job config from stdin
let mut buffer = String::new(); let mut buffer = String::new();
io::stdin().read_to_string(&mut buffer)?; io::stdin().read_to_string(&mut buffer)?;
let config: JobConfig = serde_json::from_str(&buffer)?; let config: JobConfig = serde_json::from_str(&buffer)?;
let partition_ref = config.outputs.first() self.exec_mode_with_config(job_binary, config)
}
fn exec_mode_with_config(
&mut self,
job_binary: &str,
config: JobConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let partition_ref = config
.outputs
.first()
.map(|p| p.str.clone()) .map(|p| p.str.clone())
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
// Following the state diagram: // Following the state diagram:
// 1. wrapper_validate_config -> emit_config_validate_success // 1. wrapper_validate_config -> emit_config_validate_success
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "config_validate_success".to_string(), &partition_ref,
job_status: None, job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: None, event_type: "config_validate_success".to_string(),
metadata: std::collections::HashMap::new(), job_status: None,
})); exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// 2. wrapper_launch_task -> emit_task_launch_success // 2. wrapper_launch_task -> emit_task_launch_success
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "task_launch_success".to_string(), &partition_ref,
job_status: None, job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: None, event_type: "task_launch_success".to_string(),
metadata: std::collections::HashMap::new(), job_status: None,
})); exit_code: None,
metadata: std::collections::HashMap::new(),
}),
);
// Execute the original job binary with the exec subcommand // Execute the original job binary with the exec subcommand
let mut cmd = Command::new(job_binary); let mut cmd = Command::new(job_binary);
cmd.arg("exec"); cmd.arg("exec");
// Add the args from the config // Add the args from the config
for arg in &config.args { for arg in &config.args {
cmd.arg(arg); cmd.arg(arg);
} }
cmd.stdin(Stdio::piped()) cmd.stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()); .stderr(Stdio::piped());
@ -148,7 +190,7 @@ impl JobWrapper {
// Send the config to the job // Send the config to the job
if let Some(stdin) = child.stdin.as_mut() { if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(buffer.as_bytes())?; stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?;
} }
// Start heartbeat thread // Start heartbeat thread
@ -156,36 +198,39 @@ impl JobWrapper {
let heartbeat_partition_ref = partition_ref.clone(); let heartbeat_partition_ref = partition_ref.clone();
let heartbeat_sequence = Arc::new(Mutex::new(0u64)); let heartbeat_sequence = Arc::new(Mutex::new(0u64));
let heartbeat_sequence_clone = heartbeat_sequence.clone(); let heartbeat_sequence_clone = heartbeat_sequence.clone();
let heartbeat_handle = thread::spawn(move || { let heartbeat_handle = thread::spawn(move || {
let mut system = System::new_all(); let mut system = System::new_all();
let pid = Pid::from(child_pid as usize); let pid = Pid::from(child_pid as usize);
loop { loop {
thread::sleep(Duration::from_secs(30)); thread::sleep(Duration::from_secs(30));
// Refresh process info // Refresh process info
system.refresh_processes_specifics(ProcessRefreshKind::new()); system.refresh_processes_specifics(ProcessRefreshKind::new());
// Check if process still exists // Check if process still exists
if let Some(process) = system.process(pid) { if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
let cpu_percent = process.cpu_usage(); let cpu_percent = process.cpu_usage();
// Create heartbeat event with metrics // Create heartbeat event with metrics
let mut metadata = std::collections::HashMap::new(); let mut metadata = std::collections::HashMap::new();
metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb)); metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb));
metadata.insert("cpu_usage_percent".to_string(), format!("{:.3}", cpu_percent)); metadata.insert(
"cpu_usage_percent".to_string(),
format!("{:.3}", cpu_percent),
);
// Get next sequence number for heartbeat // Get next sequence number for heartbeat
let seq = { let seq = {
let mut seq_lock = heartbeat_sequence_clone.lock().unwrap(); let mut seq_lock = heartbeat_sequence_clone.lock().unwrap();
*seq_lock += 1; *seq_lock += 1;
*seq_lock *seq_lock
}; };
let heartbeat_event = JobLogEntry { let heartbeat_event = JobLogEntry {
timestamp: JobWrapper::get_timestamp(), timestamp: get_timestamp(),
job_id: heartbeat_job_id.clone(), job_id: heartbeat_job_id.clone(),
partition_ref: heartbeat_partition_ref.clone(), partition_ref: heartbeat_partition_ref.clone(),
sequence_number: seq, sequence_number: seq,
@ -196,7 +241,7 @@ impl JobWrapper {
metadata, metadata,
})), })),
}; };
// Print the heartbeat (thread-safe since println! is synchronized) // Print the heartbeat (thread-safe since println! is synchronized)
println!("{}", serde_json::to_string(&heartbeat_event).unwrap()); println!("{}", serde_json::to_string(&heartbeat_event).unwrap());
} else { } else {
@ -210,12 +255,12 @@ impl JobWrapper {
let job_start_time = SystemTime::now(); let job_start_time = SystemTime::now();
let mut system = System::new_all(); let mut system = System::new_all();
let pid = Pid::from(child_pid as usize); let pid = Pid::from(child_pid as usize);
let mut peak_memory_mb = 0.0f64; let mut peak_memory_mb = 0.0f64;
let mut cpu_samples = Vec::new(); let mut cpu_samples = Vec::new();
let mut stdout_buffer = Vec::new(); let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new(); let mut stderr_buffer = Vec::new();
// Poll process status and metrics // Poll process status and metrics
let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop { let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop {
// Check if process has exited // Check if process has exited
@ -228,46 +273,51 @@ impl JobWrapper {
if let Some(mut stderr) = child.stderr.take() { if let Some(mut stderr) = child.stderr.take() {
stderr.read_to_end(&mut stderr_buffer)?; stderr.read_to_end(&mut stderr_buffer)?;
} }
// Calculate final metrics // Calculate final metrics
let job_duration = job_start_time.elapsed() let job_duration = job_start_time.elapsed().map_err(|e| {
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?; io::Error::new(
io::ErrorKind::Other,
format!("Time calculation error: {}", e),
)
})?;
// Calculate CPU time: average CPU percentage * wall-clock time // Calculate CPU time: average CPU percentage * wall-clock time
let total_cpu_ms = if cpu_samples.is_empty() { let total_cpu_ms = if cpu_samples.is_empty() {
0.0 0.0
} else { } else {
let avg_cpu_percent = cpu_samples.iter().sum::<f32>() as f64 / cpu_samples.len() as f64; let avg_cpu_percent =
cpu_samples.iter().sum::<f32>() as f64 / cpu_samples.len() as f64;
(avg_cpu_percent / 100.0) * job_duration.as_millis() as f64 (avg_cpu_percent / 100.0) * job_duration.as_millis() as f64
}; };
// Stop heartbeat thread // Stop heartbeat thread
drop(heartbeat_handle); drop(heartbeat_handle);
// Update sequence number to account for heartbeats // Update sequence number to account for heartbeats
let heartbeat_count = heartbeat_sequence.lock().unwrap(); let heartbeat_count = heartbeat_sequence.lock().unwrap();
self.sequence_number = self.sequence_number.max(*heartbeat_count); self.sequence_number = self.sequence_number.max(*heartbeat_count);
drop(heartbeat_count); drop(heartbeat_count);
// Create output struct to match original behavior // Create output struct to match original behavior
let output = std::process::Output { let output = std::process::Output {
status, status,
stdout: stdout_buffer, stdout: stdout_buffer,
stderr: stderr_buffer, stderr: stderr_buffer,
}; };
break (output, peak_memory_mb, total_cpu_ms, job_duration); break (output, peak_memory_mb, total_cpu_ms, job_duration);
} }
None => { None => {
// Process still running, collect metrics // Process still running, collect metrics
system.refresh_processes_specifics(ProcessRefreshKind::new()); system.refresh_processes_specifics(ProcessRefreshKind::new());
if let Some(process) = system.process(pid) { if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
peak_memory_mb = peak_memory_mb.max(memory_mb); peak_memory_mb = peak_memory_mb.max(memory_mb);
cpu_samples.push(process.cpu_usage()); cpu_samples.push(process.cpu_usage());
} }
// Sleep briefly before next poll (configurable, default 100ms) // Sleep briefly before next poll (configurable, default 100ms)
let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS") let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| "100".to_string()) .unwrap_or_else(|_| "100".to_string())
@ -283,44 +333,62 @@ impl JobWrapper {
// Capture and forward job stdout/stderr as log messages // Capture and forward job stdout/stderr as log messages
if !output.stdout.is_empty() { if !output.stdout.is_empty() {
let stdout_str = String::from_utf8_lossy(&output.stdout); let stdout_str = String::from_utf8_lossy(&output.stdout);
self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { self.emit_log(
level: log_message::LogLevel::Info as i32, &partition_ref,
message: stdout_str.to_string(), job_log_entry::Content::Log(LogMessage {
fields: std::collections::HashMap::new(), level: log_message::LogLevel::Info as i32,
})); message: stdout_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
} }
if !output.stderr.is_empty() { if !output.stderr.is_empty() {
let stderr_str = String::from_utf8_lossy(&output.stderr); let stderr_str = String::from_utf8_lossy(&output.stderr);
self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { self.emit_log(
level: log_message::LogLevel::Error as i32, &partition_ref,
message: stderr_str.to_string(), job_log_entry::Content::Log(LogMessage {
fields: std::collections::HashMap::new(), level: log_message::LogLevel::Error as i32,
})); message: stderr_str.to_string(),
fields: std::collections::HashMap::new(),
}),
);
} }
// Emit job summary with resource metrics // Emit job summary with resource metrics
let mut summary_metadata = std::collections::HashMap::new(); let mut summary_metadata = std::collections::HashMap::new();
summary_metadata.insert("runtime_ms".to_string(), format!("{:.3}", job_duration.as_millis() as f64)); summary_metadata.insert(
summary_metadata.insert("peak_memory_mb".to_string(), format!("{:.3}", peak_memory_mb)); "runtime_ms".to_string(),
format!("{:.3}", job_duration.as_millis() as f64),
);
summary_metadata.insert(
"peak_memory_mb".to_string(),
format!("{:.3}", peak_memory_mb),
);
summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms)); summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms));
summary_metadata.insert("exit_code".to_string(), exit_code.to_string()); summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "job_summary".to_string(), &partition_ref,
job_status: None, job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: Some(exit_code), event_type: "job_summary".to_string(),
metadata: summary_metadata, job_status: None,
})); exit_code: Some(exit_code),
metadata: summary_metadata,
}),
);
if success { if success {
// Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success // Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "task_success".to_string(), &partition_ref,
job_status: Some("JOB_COMPLETED".to_string()), job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: Some(exit_code), event_type: "task_success".to_string(),
metadata: std::collections::HashMap::new(), job_status: Some("JOB_COMPLETED".to_string()),
})); exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_partition_manifest -> success // Then emit_partition_manifest -> success
let end_time = SystemTime::now() let end_time = SystemTime::now()
@ -328,39 +396,52 @@ impl JobWrapper {
.unwrap() .unwrap()
.as_secs() as i64; .as_secs() as i64;
self.emit_log(&partition_ref, job_log_entry::Content::Manifest(PartitionManifest { self.emit_log(
outputs: config.outputs.clone(), &partition_ref,
inputs: vec![], // Phase 0: no input manifests yet job_log_entry::Content::Manifest(PartitionManifest {
start_time: self.start_time, outputs: config.outputs.clone(),
end_time, inputs: vec![], // Phase 0: no input manifests yet
task: Some(Task { start_time: self.start_time,
job: Some(JobLabel { end_time,
label: env::var("DATABUILD_JOB_LABEL").unwrap_or_else(|_| "unknown".to_string()), task: Some(Task {
job: Some(JobLabel {
label: env::var("DATABUILD_JOB_LABEL")
.unwrap_or_else(|_| "unknown".to_string()),
}),
config: Some(config.clone()),
}), }),
config: Some(config.clone()), metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet
}), }),
metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet );
}));
} else { } else {
// Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed // Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "task_failed".to_string(), &partition_ref,
job_status: Some("JOB_FAILED".to_string()), job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: Some(exit_code), event_type: "task_failed".to_string(),
metadata: std::collections::HashMap::new(), job_status: Some("JOB_FAILED".to_string()),
})); exit_code: Some(exit_code),
metadata: std::collections::HashMap::new(),
}),
);
// Then emit_job_exec_fail -> fail (don't emit partition manifest on failure) // Then emit_job_exec_fail -> fail (don't emit partition manifest on failure)
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(
event_type: "job_exec_fail".to_string(), &partition_ref,
job_status: Some("JOB_FAILED".to_string()), job_log_entry::Content::JobEvent(WrapperJobEvent {
exit_code: Some(exit_code), event_type: "job_exec_fail".to_string(),
metadata: { job_status: Some("JOB_FAILED".to_string()),
let mut meta = std::collections::HashMap::new(); exit_code: Some(exit_code),
meta.insert("error".to_string(), format!("Job failed with exit code {}", exit_code)); metadata: {
meta let mut meta = std::collections::HashMap::new();
}, meta.insert(
})); "error".to_string(),
format!("Job failed with exit code {}", exit_code),
);
meta
},
}),
);
} }
// Forward the original job's output to stdout for compatibility // Forward the original job's output to stdout for compatibility
@ -377,7 +458,7 @@ impl JobWrapper {
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
if args.len() < 2 { if args.len() < 2 {
eprintln!("Usage: job_wrapper <config|exec> [args...]"); eprintln!("Usage: job_wrapper <config|exec> [args...]");
std::process::exit(1); std::process::exit(1);
@ -394,9 +475,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"exec" => { "exec" => {
// For exec mode, we need to know which original job binary to call // For exec mode, we need to know which original job binary to call
// For Phase 0, we'll derive this from environment or make it configurable // For Phase 0, we'll derive this from environment or make it configurable
let job_binary = env::var("DATABUILD_JOB_BINARY") let job_binary =
.unwrap_or_else(|_| "python3".to_string()); // Default fallback env::var("DATABUILD_JOB_BINARY").unwrap_or_else(|_| "python3".to_string()); // Default fallback
wrapper.exec_mode(&job_binary)?; wrapper.exec_mode(&job_binary)?;
} }
_ => { _ => {
@ -411,11 +492,43 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
// Test infrastructure
struct TestSink {
entries: Vec<JobLogEntry>,
}
impl TestSink {
fn new() -> Self {
Self {
entries: Vec::new(),
}
}
fn find_event(&self, event_type: &str) -> Option<&JobLogEntry> {
self.entries.iter().find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == event_type
} else {
false
}
})
}
}
impl LogSink for TestSink {
fn emit(&mut self, entry: JobLogEntry) {
self.entries.push(entry);
}
}
// Helper functions for testing // Helper functions for testing
fn generate_test_config(outputs: &[String]) -> JobConfig { fn generate_test_config(outputs: &[String]) -> JobConfig {
JobConfig { JobConfig {
outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(), outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
inputs: vec![], inputs: vec![],
args: outputs.to_vec(), args: outputs.to_vec(),
env: { env: {
@ -441,11 +554,11 @@ mod tests {
fields: std::collections::HashMap::new(), fields: std::collections::HashMap::new(),
})), })),
}; };
let json = serde_json::to_string(&entry).unwrap(); let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"timestamp\":\"1234567890\"")); assert!(json.contains("\"timestamp\":\"1234567890\""));
assert!(json.contains("\"sequence_number\":1")); assert!(json.contains("\"sequence_number\":1"));
assert!(json.contains("\"Log\":{")); // Capitalized field name assert!(json.contains("\"Log\":{")); // Capitalized field name
assert!(json.contains("\"message\":\"test message\"")); assert!(json.contains("\"message\":\"test message\""));
} }
@ -461,12 +574,15 @@ mod tests {
fn test_config_mode_output_format() { fn test_config_mode_output_format() {
let outputs = vec!["test/partition".to_string()]; let outputs = vec!["test/partition".to_string()];
let config = generate_test_config(&outputs); let config = generate_test_config(&outputs);
// Verify it produces expected structure // Verify it produces expected structure
assert_eq!(config.outputs.len(), 1); assert_eq!(config.outputs.len(), 1);
assert_eq!(config.outputs[0].r#str, "test/partition"); assert_eq!(config.outputs[0].r#str, "test/partition");
assert_eq!(config.args, outputs); assert_eq!(config.args, outputs);
assert_eq!(config.env.get("PARTITION_REF"), Some(&"test/partition".to_string())); assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"test/partition".to_string())
);
} }
#[test] #[test]
@ -476,12 +592,15 @@ mod tests {
"reviews/date=2025-01-02".to_string(), "reviews/date=2025-01-02".to_string(),
]; ];
let config = generate_test_config(&outputs); let config = generate_test_config(&outputs);
assert_eq!(config.outputs.len(), 2); assert_eq!(config.outputs.len(), 2);
assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01"); assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01");
assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02"); assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02");
// First output is used as PARTITION_REF // First output is used as PARTITION_REF
assert_eq!(config.env.get("PARTITION_REF"), Some(&"reviews/date=2025-01-01".to_string())); assert_eq!(
config.env.get("PARTITION_REF"),
Some(&"reviews/date=2025-01-01".to_string())
);
} }
#[test] #[test]
@ -551,14 +670,14 @@ mod tests {
#[test] #[test]
fn test_timestamp_generation() { fn test_timestamp_generation() {
let ts1 = JobWrapper::get_timestamp(); let ts1 = get_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10)); std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = JobWrapper::get_timestamp(); let ts2 = get_timestamp();
// Timestamps should be parseable as integers // Timestamps should be parseable as integers
let t1: u64 = ts1.parse().expect("Should be valid timestamp"); let t1: u64 = ts1.parse().expect("Should be valid timestamp");
let t2: u64 = ts2.parse().expect("Should be valid timestamp"); let t2: u64 = ts2.parse().expect("Should be valid timestamp");
// Second timestamp should be equal or greater // Second timestamp should be equal or greater
assert!(t2 >= t1); assert!(t2 >= t1);
} }
@ -573,75 +692,117 @@ mod tests {
#[test] #[test]
fn test_cpu_metrics_are_captured() { fn test_cpu_metrics_are_captured() {
use std::process::{Command, Stdio};
use std::io::Write; use std::io::Write;
use std::process::{Command, Stdio};
// Set faster sampling interval for test use tempfile::NamedTempFile;
env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10");
// Create a CPU-intensive test script
// Create a CPU-intensive Python script let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let cpu_script = r#" let script_content = r#"#!/usr/bin/env python3
import sys import sys
import json
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
print('{"configs":[{"outputs":[{"str":"test/cpu"}],"inputs":[],"args":[],"env":{}}]}') config = {
"outputs": [{"str": "test/cpu"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/cpu"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec": elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# CPU-intensive work # CPU-intensive work
total = sum(range(10_000_000)) total = sum(range(50_000_000)) # Smaller for faster test
print(f"Sum: {total}") print(f"Sum: {total}")
"#; "#;
let script_path = "/tmp/databuild_test_cpu.py"; temp_file
std::fs::write(script_path, cpu_script).expect("Failed to write test script"); .write_all(script_content.as_bytes())
.expect("Failed to write script");
// Run wrapper with the CPU-intensive job let script_path = temp_file.path().to_str().unwrap();
env::set_var("DATABUILD_JOB_BINARY", "python3");
// Make script executable
let output = Command::new(env!("CARGO_BIN_EXE_job_wrapper")) std::fs::set_permissions(
.arg("exec") script_path,
.stdin(Stdio::piped()) std::os::unix::fs::PermissionsExt::from_mode(0o755),
.stdout(Stdio::piped()) )
.stderr(Stdio::piped()) .expect("Failed to set permissions");
.spawn()
.and_then(|mut child| { // Set up environment for fast sampling and the test script
// Send config to stdin env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10");
let config = JobConfig { env::set_var("DATABUILD_JOB_BINARY", script_path);
outputs: vec![PartitionRef { r#str: "test/cpu".to_string() }],
inputs: vec![], // Create test sink and wrapper
args: vec![script_path.to_string()], let sink = TestSink::new();
env: std::collections::HashMap::new(), let mut wrapper = JobWrapper::new_with_sink(sink);
};
let config_json = serde_json::to_string(&config).unwrap(); // Create a JobConfig for the test
let config = JobConfig {
if let Some(stdin) = child.stdin.as_mut() { outputs: vec![PartitionRef {
stdin.write_all(config_json.as_bytes()).unwrap(); r#str: "test/cpu".to_string(),
} }],
inputs: vec![],
child.wait_with_output() args: vec![],
}) env: {
.expect("Failed to run wrapper"); let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/cpu".to_string());
// Parse output to find job_summary env_map
let stdout = String::from_utf8_lossy(&output.stdout); },
let mut cpu_ms = None; };
for line in stdout.lines() { // We need to simulate stdin for exec_mode - let's create a test-specific exec method
if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) { // that takes the config directly rather than reading from stdin
if json["content"]["JobEvent"]["event_type"] == "job_summary" { let result = wrapper.exec_mode_with_config(script_path, config);
if let Some(cpu_str) = json["content"]["JobEvent"]["metadata"]["total_cpu_ms"].as_str() {
cpu_ms = Some(cpu_str.parse::<f64>().unwrap()); // Clean up environment
}
}
}
}
// Clean up
std::fs::remove_file(script_path).ok();
env::remove_var("DATABUILD_METRICS_INTERVAL_MS"); env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Assert we captured non-zero CPU time
assert!(cpu_ms.is_some(), "Should have found job_summary event"); // Check that exec_mode succeeded
let cpu_time = cpu_ms.unwrap(); if let Err(e) = &result {
assert!(cpu_time > 0.0, println!("exec_mode failed with error: {}", e);
"Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms", cpu_time); }
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Find the job_summary event
let summary_event = wrapper
.sink
.find_event("job_summary")
.expect("Should have job_summary event");
if let Some(job_log_entry::Content::JobEvent(event)) = &summary_event.content {
// Verify we have CPU metrics
let cpu_ms_str = event
.metadata
.get("total_cpu_ms")
.expect("Should have total_cpu_ms metric");
let cpu_ms: f64 = cpu_ms_str
.parse()
.expect("CPU metric should be valid float");
// For CPU-intensive work, we should get non-zero CPU time
assert!(
cpu_ms > 0.0,
"Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms",
cpu_ms
);
// Also verify runtime is reasonable
let runtime_ms_str = event
.metadata
.get("runtime_ms")
.expect("Should have runtime_ms metric");
let runtime_ms: f64 = runtime_ms_str
.parse()
.expect("Runtime metric should be valid float");
assert!(runtime_ms > 0.0, "Should have non-zero runtime");
println!(
"CPU test results: {:.3}ms CPU time over {:.3}ms runtime",
cpu_ms, runtime_ms
);
} else {
panic!("job_summary event should contain JobEvent");
}
} }
} }