Update heartbeating to use emit DI and add tests
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-07-28 19:35:20 -07:00
parent 7fd8b0a0d5
commit 3c4d3d89db

View file

@ -1,7 +1,7 @@
use std::env; use std::env;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
// All serialization handled by protobuf serde derives // All serialization handled by protobuf serde derives
@ -17,6 +17,17 @@ use databuild::{
// All types now come from protobuf - no custom structs needed // All types now come from protobuf - no custom structs needed
// Configuration constants
const DEFAULT_HEARTBEAT_INTERVAL_MS: u64 = 30_000; // 30 seconds
const DEFAULT_METRICS_INTERVAL_MS: u64 = 100; // 100 milliseconds
const TEST_HEARTBEAT_INTERVAL_MS: u64 = 100; // Fast heartbeats for testing
const TEST_METRICS_INTERVAL_MS: u64 = 50; // Fast metrics for testing
#[derive(Debug)]
struct HeartbeatMessage {
entry: JobLogEntry,
}
fn get_timestamp() -> String { fn get_timestamp() -> String {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -193,18 +204,24 @@ impl<S: LogSink> JobWrapper<S> {
stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?; stdin.write_all(serde_json::to_string(&config).unwrap().as_bytes())?;
} }
// Start heartbeat thread // Start heartbeat thread with channel communication
let heartbeat_job_id = self.job_id.clone(); let heartbeat_job_id = self.job_id.clone();
let heartbeat_partition_ref = partition_ref.clone(); let heartbeat_partition_ref = partition_ref.clone();
let heartbeat_sequence = Arc::new(Mutex::new(0u64)); let heartbeat_sequence = Arc::new(Mutex::new(0u64));
let heartbeat_sequence_clone = heartbeat_sequence.clone(); let heartbeat_sequence_clone = heartbeat_sequence.clone();
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<HeartbeatMessage>();
let heartbeat_handle = thread::spawn(move || { let heartbeat_handle = thread::spawn(move || {
let mut system = System::new_all(); let mut system = System::new_all();
let pid = Pid::from(child_pid as usize); let pid = Pid::from(child_pid as usize);
let heartbeat_interval_ms = env::var("DATABUILD_HEARTBEAT_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_HEARTBEAT_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL_MS);
loop { loop {
thread::sleep(Duration::from_secs(30)); thread::sleep(Duration::from_millis(heartbeat_interval_ms));
// Refresh process info // Refresh process info
system.refresh_processes_specifics(ProcessRefreshKind::new()); system.refresh_processes_specifics(ProcessRefreshKind::new());
@ -242,8 +259,11 @@ impl<S: LogSink> JobWrapper<S> {
})), })),
}; };
// Print the heartbeat (thread-safe since println! is synchronized) // Send heartbeat through channel instead of printing directly
println!("{}", serde_json::to_string(&heartbeat_event).unwrap()); if heartbeat_tx.send(HeartbeatMessage { entry: heartbeat_event }).is_err() {
// Main thread dropped receiver, exit
break;
}
} else { } else {
// Process no longer exists, exit heartbeat thread // Process no longer exists, exit heartbeat thread
break; break;
@ -253,13 +273,24 @@ impl<S: LogSink> JobWrapper<S> {
// Track metrics while job is running // Track metrics while job is running
let job_start_time = SystemTime::now(); let job_start_time = SystemTime::now();
let mut system = System::new_all(); let mut system = System::new();
let pid = Pid::from(child_pid as usize); let pid = Pid::from(child_pid as usize);
// Initial refresh to establish baseline for CPU measurements
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
let mut peak_memory_mb = 0.0f64; let mut peak_memory_mb = 0.0f64;
let mut cpu_samples = Vec::new(); let mut cpu_samples = Vec::new();
let mut stdout_buffer = Vec::new(); let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new(); let mut stderr_buffer = Vec::new();
// Sleep briefly to allow the process to start up before measuring
let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| DEFAULT_METRICS_INTERVAL_MS.to_string())
.parse::<u64>()
.unwrap_or(DEFAULT_METRICS_INTERVAL_MS);
thread::sleep(Duration::from_millis(sample_interval_ms));
// Poll process status and metrics // Poll process status and metrics
let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop { let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop {
@ -294,6 +325,11 @@ impl<S: LogSink> JobWrapper<S> {
// Stop heartbeat thread // Stop heartbeat thread
drop(heartbeat_handle); drop(heartbeat_handle);
// Process any remaining heartbeat messages
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Update sequence number to account for heartbeats // Update sequence number to account for heartbeats
let heartbeat_count = heartbeat_sequence.lock().unwrap(); let heartbeat_count = heartbeat_sequence.lock().unwrap();
self.sequence_number = self.sequence_number.max(*heartbeat_count); self.sequence_number = self.sequence_number.max(*heartbeat_count);
@ -309,21 +345,29 @@ impl<S: LogSink> JobWrapper<S> {
break (output, peak_memory_mb, total_cpu_ms, job_duration); break (output, peak_memory_mb, total_cpu_ms, job_duration);
} }
None => { None => {
// Check for heartbeat messages and emit them
while let Ok(heartbeat_msg) = heartbeat_rx.try_recv() {
self.sink.emit(heartbeat_msg.entry);
}
// Process still running, collect metrics // Process still running, collect metrics
system.refresh_processes_specifics(ProcessRefreshKind::new()); // Refresh CPU info and processes
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
// Sleep to allow CPU measurement interval
thread::sleep(Duration::from_millis(sample_interval_ms));
// Refresh again to get updated CPU usage
system.refresh_cpu();
system.refresh_processes_specifics(ProcessRefreshKind::new().with_cpu());
if let Some(process) = system.process(pid) { if let Some(process) = system.process(pid) {
let memory_mb = process.memory() as f64 / 1024.0 / 1024.0; let memory_mb = process.memory() as f64 / 1024.0 / 1024.0;
peak_memory_mb = peak_memory_mb.max(memory_mb); peak_memory_mb = peak_memory_mb.max(memory_mb);
cpu_samples.push(process.cpu_usage()); let cpu_usage = process.cpu_usage();
cpu_samples.push(cpu_usage);
} }
// Sleep briefly before next poll (configurable, default 100ms)
let sample_interval_ms = env::var("DATABUILD_METRICS_INTERVAL_MS")
.unwrap_or_else(|_| "100".to_string())
.parse::<u64>()
.unwrap_or(100);
thread::sleep(Duration::from_millis(sample_interval_ms));
} }
} }
}; };
@ -693,7 +737,6 @@ mod tests {
#[test] #[test]
fn test_cpu_metrics_are_captured() { fn test_cpu_metrics_are_captured() {
use std::io::Write; use std::io::Write;
use std::process::{Command, Stdio};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
// Create a CPU-intensive test script // Create a CPU-intensive test script
@ -701,6 +744,7 @@ mod tests {
let script_content = r#"#!/usr/bin/env python3 let script_content = r#"#!/usr/bin/env python3
import sys import sys
import json import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
config = { config = {
@ -711,8 +755,11 @@ if len(sys.argv) > 1 and sys.argv[1] == "config":
} }
print(json.dumps({"configs": [config]})) print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec": elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# CPU-intensive work # CPU-intensive work that runs longer
total = sum(range(50_000_000)) # Smaller for faster test start_time = time.time()
total = 0
while time.time() - start_time < 0.5: # Run for at least 500ms
total += sum(range(1_000_000))
print(f"Sum: {total}") print(f"Sum: {total}")
"#; "#;
@ -729,7 +776,7 @@ elif len(sys.argv) > 1 and sys.argv[1] == "exec":
.expect("Failed to set permissions"); .expect("Failed to set permissions");
// Set up environment for fast sampling and the test script // Set up environment for fast sampling and the test script
env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); env::set_var("DATABUILD_METRICS_INTERVAL_MS", "10"); // Even faster for CPU test
env::set_var("DATABUILD_JOB_BINARY", script_path); env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper // Create test sink and wrapper
@ -805,4 +852,124 @@ elif len(sys.argv) > 1 and sys.argv[1] == "exec":
panic!("job_summary event should contain JobEvent"); panic!("job_summary event should contain JobEvent");
} }
} }
#[test]
fn test_heartbeat_functionality() {
use std::io::Write;
use tempfile::NamedTempFile;
// Create a longer-running test script to trigger heartbeats
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
let script_content = r#"#!/usr/bin/env python3
import sys
import json
import time
if len(sys.argv) > 1 and sys.argv[1] == "config":
config = {
"outputs": [{"str": "test/heartbeat"}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": "test/heartbeat"}
}
print(json.dumps({"configs": [config]}))
elif len(sys.argv) > 1 and sys.argv[1] == "exec":
# Sleep long enough to trigger at least 2 heartbeats
time.sleep(0.3) # 300ms with 100ms heartbeat interval should give us 2-3 heartbeats
print("Job completed")
"#;
temp_file
.write_all(script_content.as_bytes())
.expect("Failed to write script");
let script_path = temp_file.path().to_str().unwrap();
// Make script executable
std::fs::set_permissions(
script_path,
std::os::unix::fs::PermissionsExt::from_mode(0o755),
)
.expect("Failed to set permissions");
// Set up environment for fast heartbeats and the test script
env::set_var("DATABUILD_HEARTBEAT_INTERVAL_MS", &TEST_HEARTBEAT_INTERVAL_MS.to_string());
env::set_var("DATABUILD_METRICS_INTERVAL_MS", &TEST_METRICS_INTERVAL_MS.to_string());
env::set_var("DATABUILD_JOB_BINARY", script_path);
// Create test sink and wrapper
let sink = TestSink::new();
let mut wrapper = JobWrapper::new_with_sink(sink);
// Create a JobConfig for the test
let config = JobConfig {
outputs: vec![PartitionRef {
r#str: "test/heartbeat".to_string(),
}],
inputs: vec![],
args: vec![],
env: {
let mut env_map = std::collections::HashMap::new();
env_map.insert("PARTITION_REF".to_string(), "test/heartbeat".to_string());
env_map
},
};
// Run the job
let result = wrapper.exec_mode_with_config(script_path, config);
// Clean up environment
env::remove_var("DATABUILD_HEARTBEAT_INTERVAL_MS");
env::remove_var("DATABUILD_METRICS_INTERVAL_MS");
env::remove_var("DATABUILD_JOB_BINARY");
// Check that exec_mode succeeded
assert!(result.is_ok(), "exec_mode should succeed: {:?}", result);
// Count heartbeat events
let heartbeat_count = wrapper
.sink
.entries
.iter()
.filter(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.count();
// We should have at least 1 heartbeat event (possibly 2-3 depending on timing)
assert!(
heartbeat_count >= 1,
"Expected at least 1 heartbeat event, but got {}",
heartbeat_count
);
// Verify heartbeat event structure
let heartbeat_event = wrapper
.sink
.entries
.iter()
.find(|entry| {
if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content {
event.event_type == "heartbeat"
} else {
false
}
})
.expect("Should have at least one heartbeat event");
if let Some(job_log_entry::Content::JobEvent(event)) = &heartbeat_event.content {
// Verify heartbeat contains memory and CPU metrics
assert!(
event.metadata.contains_key("memory_usage_mb"),
"Heartbeat should contain memory_usage_mb"
);
assert!(
event.metadata.contains_key("cpu_usage_percent"),
"Heartbeat should contain cpu_usage_percent"
);
}
}
} }