diff --git a/databuild/job/main.rs b/databuild/job/main.rs index 9f1f5ad..fe17f86 100644 --- a/databuild/job/main.rs +++ b/databuild/job/main.rs @@ -1,7 +1,7 @@ use std::env; use std::io::{self, Read, Write}; use std::process::{Command, Stdio}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; // All serialization handled by protobuf serde derives @@ -17,6 +17,17 @@ use databuild::{ // All types now come from protobuf - no custom structs needed +// Configuration constants +const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30_000; // 30 seconds +const DEFAULT_METRICS_INTERVAL_MS: u64 = 100; // 100 milliseconds +const TEST_HEARTBEAT_INTERVAL_MS: u64 = 100; // Fast heartbeats for testing +const TEST_METRICS_INTERVAL_MS: u64 = 50; // Fast metrics for testing + +#[derive(Debug)] +struct HeartbeatMessage { + entry: JobLogEntry, +} + fn get_timestamp() -> String { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -193,18 +204,24 @@ impl JobWrapper { stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?; } - // Start heartbeat thread + // Start heartbeat thread with channel communication let heartbeat_job_id = self.job_id.clone(); let heartbeat_partition_ref = partition_ref.clone(); let heartbeat_sequence = Arc::new(Mutex::new(0u64)); let heartbeat_sequence_clone = heartbeat_sequence.clone(); + let (heartbeat_tx, heartbeat_rx) = mpsc::channel::(); let heartbeat_handle = thread::spawn(move || { let mut system = System::new_all(); let pid = Pid::from(child_pid as usize); + let heartbeat_interval_ms = env::var("DATABUILD_HEARTBEAT_INTERVAL_MS") + .unwrap_or_else(|_| DEFAULT_HEARTBEAT_INTERVAL_MS.to_string()) + .parse::() + .unwrap_or(DEFAULT_HEARTBEAT_INTERVAL_MS); + loop { - thread::sleep(Duration::from_secs(30)); + thread::sleep(Duration::from_millis(heartbeat_interval_ms)); // Refresh process info system.refresh_processes_specifics(ProcessRefreshKind::new()); @@ -242,8 +259,11 @@ impl JobWrapper { })), }; - // Print the heartbeat (thread-safe since println! is synchronized) - println!("{}", serde_json::to_string(&heartbeat_event).unwrap()); + // Send heartbeat through channel instead of printing directly + if heartbeat_tx.send(HeartbeatMessage { entry: heartbeat_event }).is_err() { + // Main thread dropped receiver, exit + break; + } } else { // Process no longer exists, exit heartbeat thread break; @@ -253,13 +273,24 @@ impl JobWrapper { // Track metrics while job is running let job_start_time = SystemTime::now(); - let mut system = System::new_all(); + let mut system = System::new(); let pid = Pid::from(child_pid as usize); + // Initial refresh to establish baseline for CPU measurements + system.refresh_cpu(); + system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu()); + let mut peak_memory_mb = 0.0f64; let mut cpu_samples = Vec::new(); let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); + + // Sleep briefly to allow the process to start up before measuring + let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS") + .unwrap_or_else(|_| DEFAULT_METRICS_INTERVAL_MS.to_string()) + .parse::() + .unwrap_or(DEFAULT_METRICS_INTERVAL_MS); + thread::sleep(Duration::from_millis(sample_interval_ms)); // Poll process status and metrics let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop { @@ -294,6 +325,11 @@ impl JobWrapper { // Stop heartbeat thread drop(heartbeat_handle); + // Process any remaining heartbeat messages + while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() { + self.sink.emit(heartbeat_msg.entry); + } + // Update sequence number to account for heartbeats let heartbeat_count = heartbeat_sequence.lock().unwrap(); self.sequence_number = self.sequence_number.max(*heartbeat_count); @@ -309,21 +345,29 @@ impl JobWrapper { break (output, peak_memory_mb, total_cpu_ms, job_duration); } None => { + // Check for heartbeat messages and emit them + while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() { + self.sink.emit(heartbeat_msg.entry); + } + // Process still running, collect metrics - system.refresh_processes_specifics(ProcessRefreshKind::new()); + // Refresh CPU info and processes + system.refresh_cpu(); + system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu()); + + // Sleep to allow CPU measurement interval + thread::sleep(Duration::from_millis(sample_interval_ms)); + + // Refresh again to get updated CPU usage + system.refresh_cpu(); + system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu()); if let Some(process) = system.process(pid) { let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; peak_memory_mb = peak_memory_mb.max(memory_mb); - cpu_samples.push(process.cpu_usage()); + let cpu_usage = process.cpu_usage(); + cpu_samples.push(cpu_usage); } - - // Sleep briefly before next poll (configurable, default 100ms) - let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS") - .unwrap_or_else(|_| "100".to_string()) - .parse::() - .unwrap_or(100); - thread::sleep(Duration::from_millis(sample_interval_ms)); } } }; @@ -693,7 +737,6 @@ mod tests { #[test] fn test_cpu_metrics_are_captured() { use std::io::Write; - use std::process::{Command, Stdio}; use tempfile::NamedTempFile; // Create a CPU-intensive test script @@ -701,6 +744,7 @@ mod tests { let script_content = r#"#!/usr/bin/env python3 import sys import json +import time if len(sys.argv) > 1 and sys.argv[1] == "config": config = { @@ -711,8 +755,11 @@ if len(sys.argv) > 1 and sys.argv[1] == "config": } print(json.dumps({"configs": [config]})) elif len(sys.argv) > 1 and sys.argv[1] == "exec": - # CPU-intensive work - total = sum(range(50_000_000)) # Smaller for faster test + # CPU-intensive work that runs longer + start_time = time.time() + total = 0 + while time.time() - start_time < 0.5: # Run for at least 500ms + total += sum(range(1_000_000)) print(f"Sum: {total}") "#; @@ -729,7 +776,7 @@ elif len(sys.argv) > 1 and sys.argv[1] == "exec": .expect("Failed to set permissions"); // Set up environment for fast sampling and the test script - env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); + env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); // Even faster for CPU test env::set_var("DATABUILD_JOB_BINARY", script_path); // Create test sink and wrapper @@ -805,4 +852,124 @@ elif len(sys.argv) > 1 and sys.argv[1] == "exec": panic!("job_summary event should contain JobEvent"); } } + + #[test] + fn test_heartbeat_functionality() { + use std::io::Write; + use tempfile::NamedTempFile; + + // Create a longer-running test script to trigger heartbeats + let mut temp_file = NamedTempFile::new().expect("Failed to create temp file"); + let script_content = r#"#!/usr/bin/env python3 +import sys +import json +import time + +if len(sys.argv) > 1 and sys.argv[1] == "config": + config = { + "outputs": [{"str": "test/heartbeat"}], + "inputs": [], + "args": [], + "env": {"PARTITION_REF": "test/heartbeat"} + } + print(json.dumps({"configs": [config]})) +elif len(sys.argv) > 1 and sys.argv[1] == "exec": + # Sleep long enough to trigger at least 2 heartbeats + time.sleep(0.3) # 300ms with 100ms heartbeat interval should give us 2-3 heartbeats + print("Job completed") +"#; + + temp_file + .write_all(script_content.as_bytes()) + .expect("Failed to write script"); + let script_path = temp_file.path().to_str().unwrap(); + + // Make script executable + std::fs::set_permissions( + script_path, + std::os::unix::fs::PermissionsExt::from_mode(0o755), + ) + .expect("Failed to set permissions"); + + // Set up environment for fast heartbeats and the test script + env::set_var("DATABUILD_HEARTBEAT_INTERVAL_MS", &TEST_HEARTBEAT_INTERVAL_MS.to_string()); + env::set_var("DATABUILD_METRICS_INTERVAL_MS", &TEST_METRICS_INTERVAL_MS.to_string()); + env::set_var("DATABUILD_JOB_BINARY", script_path); + + // Create test sink and wrapper + let sink = TestSink::new(); + let mut wrapper = JobWrapper::new_with_sink(sink); + + // Create a JobConfig for the test + let config = JobConfig { + outputs: vec![PartitionRef { + r#str: "test/heartbeat".to_string(), + }], + inputs: vec![], + args: vec![], + env: { + let mut env_map = std::collections::HashMap::new(); + env_map.insert("PARTITION_REF".to_string(), "test/heartbeat".to_string()); + env_map + }, + }; + + // Run the job + let result = wrapper.exec_mode_with_config(script_path, config); + + // Clean up environment + env::remove_var("DATABUILD_HEARTBEAT_INTERVAL_MS"); + env::remove_var("DATABUILD_METRICS_INTERVAL_MS"); + env::remove_var("DATABUILD_JOB_BINARY"); + + // Check that exec_mode succeeded + assert!(result.is_ok(), "exec_mode should succeed: {:?}", result); + + // Count heartbeat events + let heartbeat_count = wrapper + .sink + .entries + .iter() + .filter(|entry| { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + event.event_type == "heartbeat" + } else { + false + } + }) + .count(); + + // We should have at least 1 heartbeat event (possibly 2-3 depending on timing) + assert!( + heartbeat_count >= 1, + "Expected at least 1 heartbeat event, but got {}", + heartbeat_count + ); + + // Verify heartbeat event structure + let heartbeat_event = wrapper + .sink + .entries + .iter() + .find(|entry| { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + event.event_type == "heartbeat" + } else { + false + } + }) + .expect("Should have at least one heartbeat event"); + + if let Some(job_log_entry::Content::JobEvent(event)) = &heartbeat_event.content { + // Verify heartbeat contains memory and CPU metrics + assert!( + event.metadata.contains_key("memory_usage_mb"), + "Heartbeat should contain memory_usage_mb" + ); + assert!( + event.metadata.contains_key("cpu_usage_percent"), + "Heartbeat should contain cpu_usage_percent" + ); + } + } }