Use doubles for tracked metrics

This commit is contained in:
Stuart Axelbrooke 2025-07-28 18:49:52 -07:00
parent 79f316e0db
commit 41ea8f129c

View file

@ -169,13 +169,13 @@ impl JobWrapper {
// Check if process still exists // Check if process still exists
if let Some(process) = system.process(pid) { if let Some(process) = system.process(pid) {
let memory_mb = process.memory() / 1024 / 1024; let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
let cpu_percent = process.cpu_usage(); let cpu_percent = process.cpu_usage();
// Create heartbeat event with metrics // Create heartbeat event with metrics
let mut metadata = std::collections::HashMap::new(); let mut metadata = std::collections::HashMap::new();
metadata.insert("memory_usage_mb".to_string(), memory_mb.to_string()); metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb));
metadata.insert("cpu_usage_percent".to_string(), cpu_percent.to_string()); metadata.insert("cpu_usage_percent".to_string(), format!("{:.3}", cpu_percent));
// Get next sequence number for heartbeat // Get next sequence number for heartbeat
let seq = { let seq = {
@ -211,7 +211,7 @@ impl JobWrapper {
let mut system = System::new_all(); let mut system = System::new_all();
let pid = Pid::from(child_pid as usize); let pid = Pid::from(child_pid as usize);
let mut peak_memory_mb = 0u64; let mut peak_memory_mb = 0.0f64;
let mut cpu_samples = Vec::new(); let mut cpu_samples = Vec::new();
let mut stdout_buffer = Vec::new(); let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new(); let mut stderr_buffer = Vec::new();
@ -232,7 +232,14 @@ impl JobWrapper {
// Calculate final metrics // Calculate final metrics
let job_duration = job_start_time.elapsed() let job_duration = job_start_time.elapsed()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?;
let total_cpu_ms = (cpu_samples.iter().sum::<f32>() * 10.0) as u64; // Convert to milliseconds
// Calculate CPU time: average CPU percentage * wall-clock time
let total_cpu_ms = if cpu_samples.is_empty() {
0.0
} else {
let avg_cpu_percent = cpu_samples.iter().sum::<f32>() as f64 / cpu_samples.len() as f64;
(avg_cpu_percent / 100.0) * job_duration.as_millis() as f64
};
// Stop heartbeat thread // Stop heartbeat thread
drop(heartbeat_handle); drop(heartbeat_handle);
@ -256,13 +263,17 @@ impl JobWrapper {
system.refresh_processes_specifics(ProcessRefreshKind::new()); system.refresh_processes_specifics(ProcessRefreshKind::new());
if let Some(process) = system.process(pid) { if let Some(process) = system.process(pid) {
let memory_mb = process.memory() / 1024 / 1024; let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
peak_memory_mb = peak_memory_mb.max(memory_mb); peak_memory_mb = peak_memory_mb.max(memory_mb);
cpu_samples.push(process.cpu_usage()); cpu_samples.push(process.cpu_usage());
} }
// Sleep briefly before next poll (100ms) // Sleep briefly before next poll (configurable, default 100ms)
thread::sleep(Duration::from_millis(100)); let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| "100".to_string())
.parse::<u64>()
.unwrap_or(100);
thread::sleep(Duration::from_millis(sample_interval_ms));
} }
} }
}; };
@ -290,9 +301,9 @@ impl JobWrapper {
// Emit job summary with resource metrics // Emit job summary with resource metrics
let mut summary_metadata = std::collections::HashMap::new(); let mut summary_metadata = std::collections::HashMap::new();
summary_metadata.insert("runtime_ms".to_string(), job_duration.as_millis().to_string()); summary_metadata.insert("runtime_ms".to_string(), format!("{:.3}", job_duration.as_millis() as f64));
summary_metadata.insert("peak_memory_mb".to_string(), peak_memory_mb.to_string()); summary_metadata.insert("peak_memory_mb".to_string(), format!("{:.3}", peak_memory_mb));
summary_metadata.insert("total_cpu_ms".to_string(), total_cpu_ms.to_string()); summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms));
summary_metadata.insert("exit_code".to_string(), exit_code.to_string()); summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
@ -559,4 +570,78 @@ mod tests {
assert!(!wrapper.job_id.is_empty()); assert!(!wrapper.job_id.is_empty());
assert!(wrapper.start_time > 0); assert!(wrapper.start_time > 0);
} }
#[test]
fn test_cpu_metrics_are_captured() {
use std::process::{Command, Stdio};
use std::io::Write;
// Set faster sampling interval for test
env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10");
// Create a CPU-intensive Python script
let cpu_script = r#"
import sys
if len(sys.argv) > 1 and sys.argv[1] == "config":
print('{"configs":[{"outputs":[{"str":"test/cpu"}],"inputs":[],"args":[],"env":{}}]}')
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# CPU-intensive work
total = sum(range(10_000_000))
print(f"Sum: {total}")
"#;
let script_path = "/tmp/databuild_test_cpu.py";
std::fs::write(script_path, cpu_script).expect("Failed to write test script");
// Run wrapper with the CPU-intensive job
env::set_var("DATABUILD_JOB_BINARY", "python3");
let output = Command::new(env!("CARGO_BIN_EXE_job_wrapper"))
.arg("exec")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.and_then(|mut child| {
// Send config to stdin
let config = JobConfig {
outputs: vec![PartitionRef { r#str: "test/cpu".to_string() }],
inputs: vec![],
args: vec![script_path.to_string()],
env: std::collections::HashMap::new(),
};
let config_json = serde_json::to_string(&config).unwrap();
if let Some(stdin) = child.stdin.as_mut() {
stdin.write_all(config_json.as_bytes()).unwrap();
}
child.wait_with_output()
})
.expect("Failed to run wrapper");
// Parse output to find job_summary
let stdout = String::from_utf8_lossy(&output.stdout);
let mut cpu_ms = None;
for line in stdout.lines() {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
if json["content"]["JobEvent"]["event_type"] == "job_summary" {
if let Some(cpu_str) = json["content"]["JobEvent"]["metadata"]["total_cpu_ms"].as_str() {
cpu_ms = Some(cpu_str.parse::<f64>().unwrap());
}
}
}
}
// Clean up
std::fs::remove_file(script_path).ok();
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
// Assert we captured non-zero CPU time
assert!(cpu_ms.is_some(), "Should have found job_summary event");
let cpu_time = cpu_ms.unwrap();
assert!(cpu_time > 0.0,
"Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms", cpu_time);
}
} }