diff --git a/databuild/job/BUILD.bazel b/databuild/job/BUILD.bazel index dee4035..dc41474 100644 --- a/databuild/job/BUILD.bazel +++ b/databuild/job/BUILD.bazel @@ -22,5 +22,6 @@ rust_test( "@crates//:serde_json", "@crates//:uuid", "@crates//:sysinfo", + "@crates//:tempfile", ], ) diff --git a/databuild/job/main.rs b/databuild/job/main.rs index a6dc83f..9f1f5ad 100644 --- a/databuild/job/main.rs +++ b/databuild/job/main.rs @@ -2,29 +2,56 @@ use std::env; use std::io::{self, Read, Write}; use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH, Duration}; use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; // All serialization handled by protobuf serde derives use serde_json; +use sysinfo::{Pid, ProcessRefreshKind, System}; use uuid::Uuid; -use sysinfo::{System, ProcessRefreshKind, Pid}; // Import protobuf types from databuild use databuild::{ - PartitionRef, PartitionManifest, Task, JobLabel, JobConfig, - JobLogEntry, LogMessage, WrapperJobEvent, job_log_entry, log_message + job_log_entry, log_message, JobConfig, JobLabel, JobLogEntry, LogMessage, PartitionManifest, + PartitionRef, Task, WrapperJobEvent, }; // All types now come from protobuf - no custom structs needed -struct JobWrapper { +fn get_timestamp() -> String { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string() +} + +trait LogSink { + fn emit(&mut self, entry: JobLogEntry); +} + +struct StdoutSink; + +impl LogSink for StdoutSink { + fn emit(&mut self, entry: JobLogEntry) { + println!("{}", serde_json::to_string(&entry).unwrap()); + } +} + +struct JobWrapper { job_id: String, sequence_number: u64, start_time: i64, + sink: S, } -impl JobWrapper { +impl JobWrapper { fn new() -> Self { + Self::new_with_sink(StdoutSink) + } +} + +impl JobWrapper { + fn new_with_sink(sink: S) -> Self { Self { job_id: Uuid::new_v4().to_string(), sequence_number: 0, @@ -32,17 +59,10 @@ impl JobWrapper { .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as i64, + sink, } } - fn get_timestamp() -> String { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() - .to_string() - } - fn next_sequence(&mut self) -> u64 { self.sequence_number += 1; self.sequence_number @@ -50,32 +70,38 @@ impl JobWrapper { fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) { let entry = JobLogEntry { - timestamp: Self::get_timestamp(), + timestamp: get_timestamp(), job_id: self.job_id.clone(), partition_ref: partition_ref.to_string(), sequence_number: self.next_sequence(), content: Some(content), }; - - println!("{}", serde_json::to_string(&entry).unwrap()); + + self.sink.emit(entry); } fn config_mode(&mut self, outputs: Vec) -> Result<(), Box> { // Parse the partition ref from args (first argument) let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone(); - + // Following the state diagram: wrapper_validate_config -> emit_config_validate_success - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "config_validate_success".to_string(), - metadata: std::collections::HashMap::new(), - job_status: None, - exit_code: None, - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "config_validate_success".to_string(), + metadata: std::collections::HashMap::new(), + job_status: None, + exit_code: None, + }), + ); // For Phase 0, we still need to produce the expected JSON config format // so the current graph system can parse it. Later phases will change this. let config = JobConfig { - outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(), + outputs: outputs + .iter() + .map(|s| PartitionRef { r#str: s.clone() }) + .collect(), inputs: vec![], args: outputs.clone(), env: { @@ -92,9 +118,9 @@ impl JobWrapper { let configs_wrapper = serde_json::json!({ "configs": [config] }); - + println!("{}", serde_json::to_string(&configs_wrapper)?); - + Ok(()) } @@ -102,38 +128,54 @@ impl JobWrapper { // Read the job config from stdin let mut buffer = String::new(); io::stdin().read_to_string(&mut buffer)?; - + let config: JobConfig = serde_json::from_str(&buffer)?; - let partition_ref = config.outputs.first() + self.exec_mode_with_config(job_binary, config) + } + + fn exec_mode_with_config( + &mut self, + job_binary: &str, + config: JobConfig, + ) -> Result<(), Box> { + let partition_ref = config + .outputs + .first() .map(|p| p.str.clone()) .unwrap_or_else(|| "unknown".to_string()); // Following the state diagram: // 1. wrapper_validate_config -> emit_config_validate_success - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "config_validate_success".to_string(), - job_status: None, - exit_code: None, - metadata: std::collections::HashMap::new(), - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "config_validate_success".to_string(), + job_status: None, + exit_code: None, + metadata: std::collections::HashMap::new(), + }), + ); - // 2. wrapper_launch_task -> emit_task_launch_success - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "task_launch_success".to_string(), - job_status: None, - exit_code: None, - metadata: std::collections::HashMap::new(), - })); + // 2. wrapper_launch_task -> emit_task_launch_success + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_launch_success".to_string(), + job_status: None, + exit_code: None, + metadata: std::collections::HashMap::new(), + }), + ); // Execute the original job binary with the exec subcommand let mut cmd = Command::new(job_binary); cmd.arg("exec"); - + // Add the args from the config for arg in &config.args { cmd.arg(arg); } - + cmd.stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); @@ -148,7 +190,7 @@ impl JobWrapper { // Send the config to the job if let Some(stdin) = child.stdin.as_mut() { - stdin.write_all(buffer.as_bytes())?; + stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?; } // Start heartbeat thread @@ -156,36 +198,39 @@ impl JobWrapper { let heartbeat_partition_ref = partition_ref.clone(); let heartbeat_sequence = Arc::new(Mutex::new(0u64)); let heartbeat_sequence_clone = heartbeat_sequence.clone(); - + let heartbeat_handle = thread::spawn(move || { let mut system = System::new_all(); let pid = Pid::from(child_pid as usize); - + loop { thread::sleep(Duration::from_secs(30)); - + // Refresh process info system.refresh_processes_specifics(ProcessRefreshKind::new()); - + // Check if process still exists if let Some(process) = system.process(pid) { let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; let cpu_percent = process.cpu_usage(); - + // Create heartbeat event with metrics let mut metadata = std::collections::HashMap::new(); metadata.insert("memory_usage_mb".to_string(), format!("{:.3}", memory_mb)); - metadata.insert("cpu_usage_percent".to_string(), format!("{:.3}", cpu_percent)); - + metadata.insert( + "cpu_usage_percent".to_string(), + format!("{:.3}", cpu_percent), + ); + // Get next sequence number for heartbeat let seq = { let mut seq_lock = heartbeat_sequence_clone.lock().unwrap(); *seq_lock += 1; *seq_lock }; - + let heartbeat_event = JobLogEntry { - timestamp: JobWrapper::get_timestamp(), + timestamp: get_timestamp(), job_id: heartbeat_job_id.clone(), partition_ref: heartbeat_partition_ref.clone(), sequence_number: seq, @@ -196,7 +241,7 @@ impl JobWrapper { metadata, })), }; - + // Print the heartbeat (thread-safe since println! is synchronized) println!("{}", serde_json::to_string(&heartbeat_event).unwrap()); } else { @@ -210,12 +255,12 @@ impl JobWrapper { let job_start_time = SystemTime::now(); let mut system = System::new_all(); let pid = Pid::from(child_pid as usize); - + let mut peak_memory_mb = 0.0f64; let mut cpu_samples = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); - + // Poll process status and metrics let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop { // Check if process has exited @@ -228,46 +273,51 @@ impl JobWrapper { if let Some(mut stderr) = child.stderr.take() { stderr.read_to_end(&mut stderr_buffer)?; } - + // Calculate final metrics - let job_duration = job_start_time.elapsed() - .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?; - + let job_duration = job_start_time.elapsed().map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("Time calculation error: {}", e), + ) + })?; + // Calculate CPU time: average CPU percentage * wall-clock time let total_cpu_ms = if cpu_samples.is_empty() { 0.0 } else { - let avg_cpu_percent = cpu_samples.iter().sum::() as f64 / cpu_samples.len() as f64; + let avg_cpu_percent = + cpu_samples.iter().sum::() as f64 / cpu_samples.len() as f64; (avg_cpu_percent / 100.0) * job_duration.as_millis() as f64 }; - + // Stop heartbeat thread drop(heartbeat_handle); - + // Update sequence number to account for heartbeats let heartbeat_count = heartbeat_sequence.lock().unwrap(); self.sequence_number = self.sequence_number.max(*heartbeat_count); drop(heartbeat_count); - + // Create output struct to match original behavior let output = std::process::Output { status, stdout: stdout_buffer, stderr: stderr_buffer, }; - + break (output, peak_memory_mb, total_cpu_ms, job_duration); } None => { // Process still running, collect metrics system.refresh_processes_specifics(ProcessRefreshKind::new()); - + if let Some(process) = system.process(pid) { let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; peak_memory_mb = peak_memory_mb.max(memory_mb); cpu_samples.push(process.cpu_usage()); } - + // Sleep briefly before next poll (configurable, default 100ms) let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS") .unwrap_or_else(|_| "100".to_string()) @@ -283,44 +333,62 @@ impl JobWrapper { // Capture and forward job stdout/stderr as log messages if !output.stdout.is_empty() { let stdout_str = String::from_utf8_lossy(&output.stdout); - self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { - level: log_message::LogLevel::Info as i32, - message: stdout_str.to_string(), - fields: std::collections::HashMap::new(), - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::Log(LogMessage { + level: log_message::LogLevel::Info as i32, + message: stdout_str.to_string(), + fields: std::collections::HashMap::new(), + }), + ); } if !output.stderr.is_empty() { let stderr_str = String::from_utf8_lossy(&output.stderr); - self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage { - level: log_message::LogLevel::Error as i32, - message: stderr_str.to_string(), - fields: std::collections::HashMap::new(), - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::Log(LogMessage { + level: log_message::LogLevel::Error as i32, + message: stderr_str.to_string(), + fields: std::collections::HashMap::new(), + }), + ); } // Emit job summary with resource metrics let mut summary_metadata = std::collections::HashMap::new(); - summary_metadata.insert("runtime_ms".to_string(), format!("{:.3}", job_duration.as_millis() as f64)); - summary_metadata.insert("peak_memory_mb".to_string(), format!("{:.3}", peak_memory_mb)); + summary_metadata.insert( + "runtime_ms".to_string(), + format!("{:.3}", job_duration.as_millis() as f64), + ); + summary_metadata.insert( + "peak_memory_mb".to_string(), + format!("{:.3}", peak_memory_mb), + ); summary_metadata.insert("total_cpu_ms".to_string(), format!("{:.3}", total_cpu_ms)); summary_metadata.insert("exit_code".to_string(), exit_code.to_string()); - - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "job_summary".to_string(), - job_status: None, - exit_code: Some(exit_code), - metadata: summary_metadata, - })); + + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_summary".to_string(), + job_status: None, + exit_code: Some(exit_code), + metadata: summary_metadata, + }), + ); if success { // Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "task_success".to_string(), - job_status: Some("JOB_COMPLETED".to_string()), - exit_code: Some(exit_code), - metadata: std::collections::HashMap::new(), - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(exit_code), + metadata: std::collections::HashMap::new(), + }), + ); // Then emit_partition_manifest -> success let end_time = SystemTime::now() @@ -328,39 +396,52 @@ impl JobWrapper { .unwrap() .as_secs() as i64; - self.emit_log(&partition_ref, job_log_entry::Content::Manifest(PartitionManifest { - outputs: config.outputs.clone(), - inputs: vec![], // Phase 0: no input manifests yet - start_time: self.start_time, - end_time, - task: Some(Task { - job: Some(JobLabel { - label: env::var("DATABUILD_JOB_LABEL").unwrap_or_else(|_| "unknown".to_string()), + self.emit_log( + &partition_ref, + job_log_entry::Content::Manifest(PartitionManifest { + outputs: config.outputs.clone(), + inputs: vec![], // Phase 0: no input manifests yet + start_time: self.start_time, + end_time, + task: Some(Task { + job: Some(JobLabel { + label: env::var("DATABUILD_JOB_LABEL") + .unwrap_or_else(|_| "unknown".to_string()), + }), + config: Some(config.clone()), }), - config: Some(config.clone()), + metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet }), - metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet - })); + ); } else { // Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "task_failed".to_string(), - job_status: Some("JOB_FAILED".to_string()), - exit_code: Some(exit_code), - metadata: std::collections::HashMap::new(), - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_failed".to_string(), + job_status: Some("JOB_FAILED".to_string()), + exit_code: Some(exit_code), + metadata: std::collections::HashMap::new(), + }), + ); // Then emit_job_exec_fail -> fail (don't emit partition manifest on failure) - self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent { - event_type: "job_exec_fail".to_string(), - job_status: Some("JOB_FAILED".to_string()), - exit_code: Some(exit_code), - metadata: { - let mut meta = std::collections::HashMap::new(); - meta.insert("error".to_string(), format!("Job failed with exit code {}", exit_code)); - meta - }, - })); + self.emit_log( + &partition_ref, + job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_exec_fail".to_string(), + job_status: Some("JOB_FAILED".to_string()), + exit_code: Some(exit_code), + metadata: { + let mut meta = std::collections::HashMap::new(); + meta.insert( + "error".to_string(), + format!("Job failed with exit code {}", exit_code), + ); + meta + }, + }), + ); } // Forward the original job's output to stdout for compatibility @@ -377,7 +458,7 @@ impl JobWrapper { fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); - + if args.len() < 2 { eprintln!("Usage: job_wrapper [args...]"); std::process::exit(1); @@ -394,9 +475,9 @@ fn main() -> Result<(), Box> { "exec" => { // For exec mode, we need to know which original job binary to call // For Phase 0, we'll derive this from environment or make it configurable - let job_binary = env::var("DATABUILD_JOB_BINARY") - .unwrap_or_else(|_| "python3".to_string()); // Default fallback - + let job_binary = + env::var("DATABUILD_JOB_BINARY").unwrap_or_else(|_| "python3".to_string()); // Default fallback + wrapper.exec_mode(&job_binary)?; } _ => { @@ -411,11 +492,43 @@ fn main() -> Result<(), Box> { #[cfg(test)] mod tests { use super::*; - + + // Test infrastructure + struct TestSink { + entries: Vec, + } + + impl TestSink { + fn new() -> Self { + Self { + entries: Vec::new(), + } + } + + fn find_event(&self, event_type: &str) -> Option<&JobLogEntry> { + self.entries.iter().find(|entry| { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + event.event_type == event_type + } else { + false + } + }) + } + } + + impl LogSink for TestSink { + fn emit(&mut self, entry: JobLogEntry) { + self.entries.push(entry); + } + } + // Helper functions for testing fn generate_test_config(outputs: &[String]) -> JobConfig { JobConfig { - outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(), + outputs: outputs + .iter() + .map(|s| PartitionRef { r#str: s.clone() }) + .collect(), inputs: vec![], args: outputs.to_vec(), env: { @@ -441,11 +554,11 @@ mod tests { fields: std::collections::HashMap::new(), })), }; - + let json = serde_json::to_string(&entry).unwrap(); assert!(json.contains("\"timestamp\":\"1234567890\"")); assert!(json.contains("\"sequence_number\":1")); - assert!(json.contains("\"Log\":{")); // Capitalized field name + assert!(json.contains("\"Log\":{")); // Capitalized field name assert!(json.contains("\"message\":\"test message\"")); } @@ -461,12 +574,15 @@ mod tests { fn test_config_mode_output_format() { let outputs = vec!["test/partition".to_string()]; let config = generate_test_config(&outputs); - + // Verify it produces expected structure assert_eq!(config.outputs.len(), 1); assert_eq!(config.outputs[0].r#str, "test/partition"); assert_eq!(config.args, outputs); - assert_eq!(config.env.get("PARTITION_REF"), Some(&"test/partition".to_string())); + assert_eq!( + config.env.get("PARTITION_REF"), + Some(&"test/partition".to_string()) + ); } #[test] @@ -476,12 +592,15 @@ mod tests { "reviews/date=2025-01-02".to_string(), ]; let config = generate_test_config(&outputs); - + assert_eq!(config.outputs.len(), 2); assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01"); assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02"); // First output is used as PARTITION_REF - assert_eq!(config.env.get("PARTITION_REF"), Some(&"reviews/date=2025-01-01".to_string())); + assert_eq!( + config.env.get("PARTITION_REF"), + Some(&"reviews/date=2025-01-01".to_string()) + ); } #[test] @@ -551,14 +670,14 @@ mod tests { #[test] fn test_timestamp_generation() { - let ts1 = JobWrapper::get_timestamp(); + let ts1 = get_timestamp(); std::thread::sleep(std::time::Duration::from_millis(10)); - let ts2 = JobWrapper::get_timestamp(); - + let ts2 = get_timestamp(); + // Timestamps should be parseable as integers let t1: u64 = ts1.parse().expect("Should be valid timestamp"); let t2: u64 = ts2.parse().expect("Should be valid timestamp"); - + // Second timestamp should be equal or greater assert!(t2 >= t1); } @@ -573,75 +692,117 @@ mod tests { #[test] fn test_cpu_metrics_are_captured() { - use std::process::{Command, Stdio}; use std::io::Write; - - // Set faster sampling interval for test - env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); - - // Create a CPU-intensive Python script - let cpu_script = r#" + use std::process::{Command, Stdio}; + use tempfile::NamedTempFile; + + // Create a CPU-intensive test script + let mut temp_file = NamedTempFile::new().expect("Failed to create temp file"); + let script_content = r#"#!/usr/bin/env python3 import sys +import json + if len(sys.argv) > 1 and sys.argv[1] == "config": - print('{"configs":[{"outputs":[{"str":"test/cpu"}],"inputs":[],"args":[],"env":{}}]}') + config = { + "outputs": [{"str": "test/cpu"}], + "inputs": [], + "args": [], + "env": {"PARTITION_REF": "test/cpu"} + } + print(json.dumps({"configs": [config]})) elif len(sys.argv) > 1 and sys.argv[1] == "exec": # CPU-intensive work - total = sum(range(10_000_000)) + total = sum(range(50_000_000)) # Smaller for faster test print(f"Sum: {total}") "#; - - let script_path = "/tmp/databuild_test_cpu.py"; - std::fs::write(script_path, cpu_script).expect("Failed to write test script"); - - // Run wrapper with the CPU-intensive job - env::set_var("DATABUILD_JOB_BINARY", "python3"); - - let output = Command::new(env!("CARGO_BIN_EXE_job_wrapper")) - .arg("exec") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .and_then(|mut child| { - // Send config to stdin - let config = JobConfig { - outputs: vec![PartitionRef { r#str: "test/cpu".to_string() }], - inputs: vec![], - args: vec![script_path.to_string()], - env: std::collections::HashMap::new(), - }; - let config_json = serde_json::to_string(&config).unwrap(); - - if let Some(stdin) = child.stdin.as_mut() { - stdin.write_all(config_json.as_bytes()).unwrap(); - } - - child.wait_with_output() - }) - .expect("Failed to run wrapper"); - - // Parse output to find job_summary - let stdout = String::from_utf8_lossy(&output.stdout); - let mut cpu_ms = None; - - for line in stdout.lines() { - if let Ok(json) = serde_json::from_str::(line) { - if json["content"]["JobEvent"]["event_type"] == "job_summary" { - if let Some(cpu_str) = json["content"]["JobEvent"]["metadata"]["total_cpu_ms"].as_str() { - cpu_ms = Some(cpu_str.parse::().unwrap()); - } - } - } - } - - // Clean up - std::fs::remove_file(script_path).ok(); + + temp_file + .write_all(script_content.as_bytes()) + .expect("Failed to write script"); + let script_path = temp_file.path().to_str().unwrap(); + + // Make script executable + std::fs::set_permissions( + script_path, + std::os::unix::fs::PermissionsExt::from_mode(0o755), + ) + .expect("Failed to set permissions"); + + // Set up environment for fast sampling and the test script + env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); + env::set_var("DATABUILD_JOB_BINARY", script_path); + + // Create test sink and wrapper + let sink = TestSink::new(); + let mut wrapper = JobWrapper::new_with_sink(sink); + + // Create a JobConfig for the test + let config = JobConfig { + outputs: vec![PartitionRef { + r#str: "test/cpu".to_string(), + }], + inputs: vec![], + args: vec![], + env: { + let mut env_map = std::collections::HashMap::new(); + env_map.insert("PARTITION_REF".to_string(), "test/cpu".to_string()); + env_map + }, + }; + + // We need to simulate stdin for exec_mode - let's create a test-specific exec method + // that takes the config directly rather than reading from stdin + let result = wrapper.exec_mode_with_config(script_path, config); + + // Clean up environment env::remove_var("DATABUILD_METRICS_INTERVAL_MS"); - - // Assert we captured non-zero CPU time - assert!(cpu_ms.is_some(), "Should have found job_summary event"); - let cpu_time = cpu_ms.unwrap(); - assert!(cpu_time > 0.0, - "Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms", cpu_time); + env::remove_var("DATABUILD_JOB_BINARY"); + + // Check that exec_mode succeeded + if let Err(e) = &result { + println!("exec_mode failed with error: {}", e); + } + assert!(result.is_ok(), "exec_mode should succeed: {:?}", result); + + // Find the job_summary event + let summary_event = wrapper + .sink + .find_event("job_summary") + .expect("Should have job_summary event"); + + if let Some(job_log_entry::Content::JobEvent(event)) = &summary_event.content { + // Verify we have CPU metrics + let cpu_ms_str = event + .metadata + .get("total_cpu_ms") + .expect("Should have total_cpu_ms metric"); + let cpu_ms: f64 = cpu_ms_str + .parse() + .expect("CPU metric should be valid float"); + + // For CPU-intensive work, we should get non-zero CPU time + assert!( + cpu_ms > 0.0, + "Expected non-zero CPU time for CPU-intensive workload, but got {:.3}ms", + cpu_ms + ); + + // Also verify runtime is reasonable + let runtime_ms_str = event + .metadata + .get("runtime_ms") + .expect("Should have runtime_ms metric"); + let runtime_ms: f64 = runtime_ms_str + .parse() + .expect("Runtime metric should be valid float"); + assert!(runtime_ms > 0.0, "Should have non-zero runtime"); + + println!( + "CPU test results: {:.3}ms CPU time over {:.3}ms runtime", + cpu_ms, runtime_ms + ); + } else { + panic!("job_summary event should contain JobEvent"); + } } -} \ No newline at end of file +}