diff --git a/databuild/job/main.rs b/databuild/job/main.rs index 022a1b9..a6dc83f 100644 --- a/databuild/job/main.rs +++ b/databuild/job/main.rs @@ -169,13 +169,13 @@ impl JobWrapper { // Check if process still exists if let Some(process) = system.process(pid) { - let memory_mb = process.memory() / 1024 / 1024; + let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; let cpu_percent = process.cpu_usage(); // Create heartbeat event with metrics let mut metadata = std::collections::HashMap::new(); - metadata.insert("memory_usage_mb".to_string(), memory_mb.to_string()); - metadata.insert("cpu_usage_percent".to_string(), cpu_percent.to_string()); + metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb)); + metadata.insert("cpu_usage_percent".to_string(), format!("{:.3}", cpu_percent)); // Get next sequence number for heartbeat let seq = { @@ -211,7 +211,7 @@ impl JobWrapper { let mut system = System::new_all(); let pid = Pid::from(child_pid as usize); - let mut peak_memory_mb = 0u64; + let mut peak_memory_mb = 0.0f64; let mut cpu_samples = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -232,7 +232,14 @@ impl JobWrapper { // Calculate final metrics let job_duration = job_start_time.elapsed() .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?; - let total_cpu_ms = (cpu_samples.iter().sum::() * 10.0) as u64; // Convert to milliseconds + + // Calculate CPU time: average CPU percentage * wall-clock time + let total_cpu_ms = if cpu_samples.is_empty() { + 0.0 + } else { + let avg_cpu_percent = cpu_samples.iter().sum::() as f64 / cpu_samples.len() as f64; + (avg_cpu_percent / 100.0) * job_duration.as_millis() as f64 + }; // Stop heartbeat thread drop(heartbeat_handle); @@ -256,13 +263,17 @@ impl JobWrapper { system.refresh_processes_specifics(ProcessRefreshKind::new()); if let Some(process) = system.process(pid) { - let memory_mb = process.memory() / 1024 / 1024; + let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; peak_memory_mb = peak_memory_mb.max(memory_mb); cpu_samples.push(process.cpu_usage()); } - // Sleep briefly before next poll (100ms) - thread::sleep(Duration::from_millis(100)); + // Sleep briefly before next poll (configurable, default 100ms) + let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS") + .unwrap_or_else(|_| "100".to_string()) + .parse::() + .unwrap_or(100); + thread::sleep(Duration::from_millis(sample_interval_ms)); } } }; @@ -290,9 +301,9 @@ impl JobWrapper { // Emit job summary with resource metrics let mut summary_metadata = std::collections::HashMap::new(); - summary_metadata.insert("runtime_ms".to_string(), job_duration.as_millis().to_string()); - summary_metadata.insert("peak_memory_mb".to_string(), peak_memory_mb.to_string()); - summary_metadata.insert("total_cpu_ms".to_string(), total_cpu_ms.to_string()); + summary_metadata.insert("runtime_ms".to_string(), format!("{:.3}", job_duration.as_millis() as f64)); + summary_metadata.insert("peak_memory_mb".to_string(), format!("{:.3}", peak_memory_mb)); + summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms)); summary_metadata.insert("exit_code".to_string(), exit_code.to_string()); self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { @@ -559,4 +570,78 @@ mod tests { assert!(!wrapper.job_id.is_empty()); assert!(wrapper.start_time > 0); } + + #[test] + fn test_cpu_metrics_are_captured() { + use std::process::{Command, Stdio}; + use std::io::Write; + + // Set faster sampling interval for test + env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); + + // Create a CPU-intensive Python script + let cpu_script = r#" +import sys +if len(sys.argv) > 1 and sys.argv[1] == "config": + print('{"configs":[{"outputs":[{"str":"test/cpu"}],"inputs":[],"args":[],"env":{}}]}') +elif len(sys.argv) > 1 and sys.argv[1] == "exec": + # CPU-intensive work + total = sum(range(10_000_000)) + print(f"Sum: {total}") +"#; + + let script_path = "/tmp/databuild_test_cpu.py"; + std::fs::write(script_path, cpu_script).expect("Failed to write test script"); + + // Run wrapper with the CPU-intensive job + env::set_var("DATABUILD_JOB_BINARY", "python3"); + + let output = Command::new(env!("CARGO_BIN_EXE_job_wrapper")) + .arg("exec") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .and_then(|mut child| { + // Send config to stdin + let config = JobConfig { + outputs: vec![PartitionRef { r#str: "test/cpu".to_string() }], + inputs: vec![], + args: vec![script_path.to_string()], + env: std::collections::HashMap::new(), + }; + let config_json = serde_json::to_string(&config).unwrap(); + + if let Some(stdin) = child.stdin.as_mut() { + stdin.write_all(config_json.as_bytes()).unwrap(); + } + + child.wait_with_output() + }) + .expect("Failed to run wrapper"); + + // Parse output to find job_summary + let stdout = String::from_utf8_lossy(&output.stdout); + let mut cpu_ms = None; + + for line in stdout.lines() { + if let Ok(json) = serde_json::from_str::(line) { + if json["content"]["JobEvent"]["event_type"] == "job_summary" { + if let Some(cpu_str) = json["content"]["JobEvent"]["metadata"]["total_cpu_ms"].as_str() { + cpu_ms = Some(cpu_str.parse::().unwrap()); + } + } + } + } + + // Clean up + std::fs::remove_file(script_path).ok(); + env::remove_var("DATABUILD_METRICS_INTERVAL_MS"); + + // Assert we captured non-zero CPU time + assert!(cpu_ms.is_some(), "Should have found job_summary event"); + let cpu_time = cpu_ms.unwrap(); + assert!(cpu_time > 0.0, + "Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms", cpu_time); + } } \ No newline at end of file