562 lines
No EOL
21 KiB
Rust
562 lines
No EOL
21 KiB
Rust
use std::env;
|
|
use std::io::{self, Read, Write};
|
|
use std::process::{Command, Stdio};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::{SystemTime, UNIX_EPOCH, Duration};
|
|
use std::thread;
|
|
// All serialization handled by protobuf serde derives
|
|
use serde_json;
|
|
use uuid::Uuid;
|
|
use sysinfo::{System, ProcessRefreshKind, Pid};
|
|
|
|
// Import protobuf types from databuild
|
|
use databuild::{
|
|
PartitionRef, PartitionManifest, Task, JobLabel, JobConfig,
|
|
JobLogEntry, LogMessage, WrapperJobEvent, job_log_entry, log_message
|
|
};
|
|
|
|
// All types now come from protobuf - no custom structs needed
|
|
|
|
struct JobWrapper {
|
|
job_id: String,
|
|
sequence_number: u64,
|
|
start_time: i64,
|
|
}
|
|
|
|
impl JobWrapper {
|
|
fn new() -> Self {
|
|
Self {
|
|
job_id: Uuid::new_v4().to_string(),
|
|
sequence_number: 0,
|
|
start_time: SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs() as i64,
|
|
}
|
|
}
|
|
|
|
fn get_timestamp() -> String {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs()
|
|
.to_string()
|
|
}
|
|
|
|
fn next_sequence(&mut self) -> u64 {
|
|
self.sequence_number += 1;
|
|
self.sequence_number
|
|
}
|
|
|
|
fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) {
|
|
let entry = JobLogEntry {
|
|
timestamp: Self::get_timestamp(),
|
|
job_id: self.job_id.clone(),
|
|
partition_ref: partition_ref.to_string(),
|
|
sequence_number: self.next_sequence(),
|
|
content: Some(content),
|
|
};
|
|
|
|
println!("{}", serde_json::to_string(&entry).unwrap());
|
|
}
|
|
|
|
fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
|
|
// Parse the partition ref from args (first argument)
|
|
let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone();
|
|
|
|
// Following the state diagram: wrapper_validate_config -> emit_config_validate_success
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "config_validate_success".to_string(),
|
|
metadata: std::collections::HashMap::new(),
|
|
job_status: None,
|
|
exit_code: None,
|
|
}));
|
|
|
|
// For Phase 0, we still need to produce the expected JSON config format
|
|
// so the current graph system can parse it. Later phases will change this.
|
|
let config = JobConfig {
|
|
outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(),
|
|
inputs: vec![],
|
|
args: outputs.clone(),
|
|
env: {
|
|
let mut env_map = std::collections::HashMap::new();
|
|
if let Some(partition_ref) = outputs.first() {
|
|
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
|
|
}
|
|
env_map
|
|
},
|
|
};
|
|
|
|
// For config mode, we need to output the standard config format to stdout
|
|
// The structured logs will come later during exec mode
|
|
let configs_wrapper = serde_json::json!({
|
|
"configs": [config]
|
|
});
|
|
|
|
println!("{}", serde_json::to_string(&configs_wrapper)?);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn exec_mode(&mut self, job_binary: &str) -> Result<(), Box<dyn std::error::Error>> {
|
|
// Read the job config from stdin
|
|
let mut buffer = String::new();
|
|
io::stdin().read_to_string(&mut buffer)?;
|
|
|
|
let config: JobConfig = serde_json::from_str(&buffer)?;
|
|
let partition_ref = config.outputs.first()
|
|
.map(|p| p.str.clone())
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
// Following the state diagram:
|
|
// 1. wrapper_validate_config -> emit_config_validate_success
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "config_validate_success".to_string(),
|
|
job_status: None,
|
|
exit_code: None,
|
|
metadata: std::collections::HashMap::new(),
|
|
}));
|
|
|
|
// 2. wrapper_launch_task -> emit_task_launch_success
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "task_launch_success".to_string(),
|
|
job_status: None,
|
|
exit_code: None,
|
|
metadata: std::collections::HashMap::new(),
|
|
}));
|
|
|
|
// Execute the original job binary with the exec subcommand
|
|
let mut cmd = Command::new(job_binary);
|
|
cmd.arg("exec");
|
|
|
|
// Add the args from the config
|
|
for arg in &config.args {
|
|
cmd.arg(arg);
|
|
}
|
|
|
|
cmd.stdin(Stdio::piped())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped());
|
|
|
|
// Set environment variables from config
|
|
for (key, value) in &config.env {
|
|
cmd.env(key, value);
|
|
}
|
|
|
|
let mut child = cmd.spawn()?;
|
|
let child_pid = child.id();
|
|
|
|
// Send the config to the job
|
|
if let Some(stdin) = child.stdin.as_mut() {
|
|
stdin.write_all(buffer.as_bytes())?;
|
|
}
|
|
|
|
// Start heartbeat thread
|
|
let heartbeat_job_id = self.job_id.clone();
|
|
let heartbeat_partition_ref = partition_ref.clone();
|
|
let heartbeat_sequence = Arc::new(Mutex::new(0u64));
|
|
let heartbeat_sequence_clone = heartbeat_sequence.clone();
|
|
|
|
let heartbeat_handle = thread::spawn(move || {
|
|
let mut system = System::new_all();
|
|
let pid = Pid::from(child_pid as usize);
|
|
|
|
loop {
|
|
thread::sleep(Duration::from_secs(30));
|
|
|
|
// Refresh process info
|
|
system.refresh_processes_specifics(ProcessRefreshKind::new());
|
|
|
|
// Check if process still exists
|
|
if let Some(process) = system.process(pid) {
|
|
let memory_mb = process.memory() / 1024 / 1024;
|
|
let cpu_percent = process.cpu_usage();
|
|
|
|
// Create heartbeat event with metrics
|
|
let mut metadata = std::collections::HashMap::new();
|
|
metadata.insert("memory_usage_mb".to_string(), memory_mb.to_string());
|
|
metadata.insert("cpu_usage_percent".to_string(), cpu_percent.to_string());
|
|
|
|
// Get next sequence number for heartbeat
|
|
let seq = {
|
|
let mut seq_lock = heartbeat_sequence_clone.lock().unwrap();
|
|
*seq_lock += 1;
|
|
*seq_lock
|
|
};
|
|
|
|
let heartbeat_event = JobLogEntry {
|
|
timestamp: JobWrapper::get_timestamp(),
|
|
job_id: heartbeat_job_id.clone(),
|
|
partition_ref: heartbeat_partition_ref.clone(),
|
|
sequence_number: seq,
|
|
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "heartbeat".to_string(),
|
|
job_status: None,
|
|
exit_code: None,
|
|
metadata,
|
|
})),
|
|
};
|
|
|
|
// Print the heartbeat (thread-safe since println! is synchronized)
|
|
println!("{}", serde_json::to_string(&heartbeat_event).unwrap());
|
|
} else {
|
|
// Process no longer exists, exit heartbeat thread
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Track metrics while job is running
|
|
let job_start_time = SystemTime::now();
|
|
let mut system = System::new_all();
|
|
let pid = Pid::from(child_pid as usize);
|
|
|
|
let mut peak_memory_mb = 0u64;
|
|
let mut cpu_samples = Vec::new();
|
|
let mut stdout_buffer = Vec::new();
|
|
let mut stderr_buffer = Vec::new();
|
|
|
|
// Poll process status and metrics
|
|
let (output, peak_memory_mb, total_cpu_ms, job_duration) = loop {
|
|
// Check if process has exited
|
|
match child.try_wait()? {
|
|
Some(status) => {
|
|
// Process has exited, collect any remaining output
|
|
if let Some(mut stdout) = child.stdout.take() {
|
|
stdout.read_to_end(&mut stdout_buffer)?;
|
|
}
|
|
if let Some(mut stderr) = child.stderr.take() {
|
|
stderr.read_to_end(&mut stderr_buffer)?;
|
|
}
|
|
|
|
// Calculate final metrics
|
|
let job_duration = job_start_time.elapsed()
|
|
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Time calculation error: {}", e)))?;
|
|
let total_cpu_ms = (cpu_samples.iter().sum::<f32>() * 10.0) as u64; // Convert to milliseconds
|
|
|
|
// Stop heartbeat thread
|
|
drop(heartbeat_handle);
|
|
|
|
// Update sequence number to account for heartbeats
|
|
let heartbeat_count = heartbeat_sequence.lock().unwrap();
|
|
self.sequence_number = self.sequence_number.max(*heartbeat_count);
|
|
drop(heartbeat_count);
|
|
|
|
// Create output struct to match original behavior
|
|
let output = std::process::Output {
|
|
status,
|
|
stdout: stdout_buffer,
|
|
stderr: stderr_buffer,
|
|
};
|
|
|
|
break (output, peak_memory_mb, total_cpu_ms, job_duration);
|
|
}
|
|
None => {
|
|
// Process still running, collect metrics
|
|
system.refresh_processes_specifics(ProcessRefreshKind::new());
|
|
|
|
if let Some(process) = system.process(pid) {
|
|
let memory_mb = process.memory() / 1024 / 1024;
|
|
peak_memory_mb = peak_memory_mb.max(memory_mb);
|
|
cpu_samples.push(process.cpu_usage());
|
|
}
|
|
|
|
// Sleep briefly before next poll (100ms)
|
|
thread::sleep(Duration::from_millis(100));
|
|
}
|
|
}
|
|
};
|
|
let success = output.status.success();
|
|
let exit_code = output.status.code().unwrap_or(-1);
|
|
|
|
// Capture and forward job stdout/stderr as log messages
|
|
if !output.stdout.is_empty() {
|
|
let stdout_str = String::from_utf8_lossy(&output.stdout);
|
|
self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage {
|
|
level: log_message::LogLevel::Info as i32,
|
|
message: stdout_str.to_string(),
|
|
fields: std::collections::HashMap::new(),
|
|
}));
|
|
}
|
|
|
|
if !output.stderr.is_empty() {
|
|
let stderr_str = String::from_utf8_lossy(&output.stderr);
|
|
self.emit_log(&partition_ref, job_log_entry::Content::Log(LogMessage {
|
|
level: log_message::LogLevel::Error as i32,
|
|
message: stderr_str.to_string(),
|
|
fields: std::collections::HashMap::new(),
|
|
}));
|
|
}
|
|
|
|
// Emit job summary with resource metrics
|
|
let mut summary_metadata = std::collections::HashMap::new();
|
|
summary_metadata.insert("runtime_ms".to_string(), job_duration.as_millis().to_string());
|
|
summary_metadata.insert("peak_memory_mb".to_string(), peak_memory_mb.to_string());
|
|
summary_metadata.insert("total_cpu_ms".to_string(), total_cpu_ms.to_string());
|
|
summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
|
|
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "job_summary".to_string(),
|
|
job_status: None,
|
|
exit_code: Some(exit_code),
|
|
metadata: summary_metadata,
|
|
}));
|
|
|
|
if success {
|
|
// Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "task_success".to_string(),
|
|
job_status: Some("JOB_COMPLETED".to_string()),
|
|
exit_code: Some(exit_code),
|
|
metadata: std::collections::HashMap::new(),
|
|
}));
|
|
|
|
// Then emit_partition_manifest -> success
|
|
let end_time = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs() as i64;
|
|
|
|
self.emit_log(&partition_ref, job_log_entry::Content::Manifest(PartitionManifest {
|
|
outputs: config.outputs.clone(),
|
|
inputs: vec![], // Phase 0: no input manifests yet
|
|
start_time: self.start_time,
|
|
end_time,
|
|
task: Some(Task {
|
|
job: Some(JobLabel {
|
|
label: env::var("DATABUILD_JOB_LABEL").unwrap_or_else(|_| "unknown".to_string()),
|
|
}),
|
|
config: Some(config.clone()),
|
|
}),
|
|
metadata: std::collections::HashMap::new(), // Phase 0: no metadata yet
|
|
}));
|
|
} else {
|
|
// Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "task_failed".to_string(),
|
|
job_status: Some("JOB_FAILED".to_string()),
|
|
exit_code: Some(exit_code),
|
|
metadata: std::collections::HashMap::new(),
|
|
}));
|
|
|
|
// Then emit_job_exec_fail -> fail (don't emit partition manifest on failure)
|
|
self.emit_log(&partition_ref, job_log_entry::Content::JobEvent(WrapperJobEvent {
|
|
event_type: "job_exec_fail".to_string(),
|
|
job_status: Some("JOB_FAILED".to_string()),
|
|
exit_code: Some(exit_code),
|
|
metadata: {
|
|
let mut meta = std::collections::HashMap::new();
|
|
meta.insert("error".to_string(), format!("Job failed with exit code {}", exit_code));
|
|
meta
|
|
},
|
|
}));
|
|
}
|
|
|
|
// Forward the original job's output to stdout for compatibility
|
|
io::stdout().write_all(&output.stdout)?;
|
|
io::stderr().write_all(&output.stderr)?;
|
|
|
|
if !success {
|
|
std::process::exit(exit_code);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
let args: Vec<String> = env::args().collect();
|
|
|
|
if args.len() < 2 {
|
|
eprintln!("Usage: job_wrapper <config|exec> [args...]");
|
|
std::process::exit(1);
|
|
}
|
|
|
|
let mode = &args[1];
|
|
let mut wrapper = JobWrapper::new();
|
|
|
|
match mode.as_str() {
|
|
"config" => {
|
|
let outputs = args[2..].to_vec();
|
|
wrapper.config_mode(outputs)?;
|
|
}
|
|
"exec" => {
|
|
// For exec mode, we need to know which original job binary to call
|
|
// For Phase 0, we'll derive this from environment or make it configurable
|
|
let job_binary = env::var("DATABUILD_JOB_BINARY")
|
|
.unwrap_or_else(|_| "python3".to_string()); // Default fallback
|
|
|
|
wrapper.exec_mode(&job_binary)?;
|
|
}
|
|
_ => {
|
|
eprintln!("Unknown mode: {}", mode);
|
|
std::process::exit(1);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
// Helper functions for testing
|
|
fn generate_test_config(outputs: &[String]) -> JobConfig {
|
|
JobConfig {
|
|
outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(),
|
|
inputs: vec![],
|
|
args: outputs.to_vec(),
|
|
env: {
|
|
let mut env_map = std::collections::HashMap::new();
|
|
if let Some(partition_ref) = outputs.first() {
|
|
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
|
|
}
|
|
env_map
|
|
},
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_job_log_entry_serialization() {
|
|
let entry = JobLogEntry {
|
|
timestamp: "1234567890".to_string(),
|
|
job_id: "test-id".to_string(),
|
|
partition_ref: "test/partition".to_string(),
|
|
sequence_number: 1,
|
|
content: Some(job_log_entry::Content::Log(LogMessage {
|
|
level: log_message::LogLevel::Info as i32,
|
|
message: "test message".to_string(),
|
|
fields: std::collections::HashMap::new(),
|
|
})),
|
|
};
|
|
|
|
let json = serde_json::to_string(&entry).unwrap();
|
|
assert!(json.contains("\"timestamp\":\"1234567890\""));
|
|
assert!(json.contains("\"sequence_number\":1"));
|
|
assert!(json.contains("\"Log\":{")); // Capitalized field name
|
|
assert!(json.contains("\"message\":\"test message\""));
|
|
}
|
|
|
|
#[test]
|
|
fn test_sequence_number_increment() {
|
|
let mut wrapper = JobWrapper::new();
|
|
assert_eq!(wrapper.next_sequence(), 1);
|
|
assert_eq!(wrapper.next_sequence(), 2);
|
|
assert_eq!(wrapper.next_sequence(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_config_mode_output_format() {
|
|
let outputs = vec!["test/partition".to_string()];
|
|
let config = generate_test_config(&outputs);
|
|
|
|
// Verify it produces expected structure
|
|
assert_eq!(config.outputs.len(), 1);
|
|
assert_eq!(config.outputs[0].r#str, "test/partition");
|
|
assert_eq!(config.args, outputs);
|
|
assert_eq!(config.env.get("PARTITION_REF"), Some(&"test/partition".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_multiple_outputs_config() {
|
|
let outputs = vec![
|
|
"reviews/date=2025-01-01".to_string(),
|
|
"reviews/date=2025-01-02".to_string(),
|
|
];
|
|
let config = generate_test_config(&outputs);
|
|
|
|
assert_eq!(config.outputs.len(), 2);
|
|
assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01");
|
|
assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02");
|
|
// First output is used as PARTITION_REF
|
|
assert_eq!(config.env.get("PARTITION_REF"), Some(&"reviews/date=2025-01-01".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn test_wrapper_job_event_creation() {
|
|
// Test success event
|
|
let event = WrapperJobEvent {
|
|
event_type: "task_success".to_string(),
|
|
job_status: Some("JOB_COMPLETED".to_string()),
|
|
exit_code: Some(0),
|
|
metadata: std::collections::HashMap::new(),
|
|
};
|
|
assert_eq!(event.event_type, "task_success");
|
|
assert_eq!(event.job_status, Some("JOB_COMPLETED".to_string()));
|
|
assert_eq!(event.exit_code, Some(0));
|
|
|
|
// Test failure event
|
|
let event = WrapperJobEvent {
|
|
event_type: "task_failed".to_string(),
|
|
job_status: Some("JOB_FAILED".to_string()),
|
|
exit_code: Some(1),
|
|
metadata: std::collections::HashMap::new(),
|
|
};
|
|
assert_eq!(event.event_type, "task_failed");
|
|
assert_eq!(event.job_status, Some("JOB_FAILED".to_string()));
|
|
assert_eq!(event.exit_code, Some(1));
|
|
}
|
|
|
|
#[test]
|
|
fn test_log_message_levels() {
|
|
let info_log = LogMessage {
|
|
level: log_message::LogLevel::Info as i32,
|
|
message: "info message".to_string(),
|
|
fields: std::collections::HashMap::new(),
|
|
};
|
|
assert_eq!(info_log.level, log_message::LogLevel::Info as i32);
|
|
|
|
let error_log = LogMessage {
|
|
level: log_message::LogLevel::Error as i32,
|
|
message: "error message".to_string(),
|
|
fields: std::collections::HashMap::new(),
|
|
};
|
|
assert_eq!(error_log.level, log_message::LogLevel::Error as i32);
|
|
}
|
|
|
|
#[test]
|
|
fn test_partition_manifest_structure() {
|
|
let config = generate_test_config(&vec!["test/partition".to_string()]);
|
|
let manifest = PartitionManifest {
|
|
outputs: config.outputs.clone(),
|
|
inputs: vec![],
|
|
start_time: 1234567890,
|
|
end_time: 1234567900,
|
|
task: Some(Task {
|
|
job: Some(JobLabel {
|
|
label: "//test:job".to_string(),
|
|
}),
|
|
config: Some(config),
|
|
}),
|
|
metadata: std::collections::HashMap::new(),
|
|
};
|
|
|
|
assert_eq!(manifest.outputs.len(), 1);
|
|
assert_eq!(manifest.outputs[0].r#str, "test/partition");
|
|
assert_eq!(manifest.end_time - manifest.start_time, 10);
|
|
assert!(manifest.task.is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn test_timestamp_generation() {
|
|
let ts1 = JobWrapper::get_timestamp();
|
|
std::thread::sleep(std::time::Duration::from_millis(10));
|
|
let ts2 = JobWrapper::get_timestamp();
|
|
|
|
// Timestamps should be parseable as integers
|
|
let t1: u64 = ts1.parse().expect("Should be valid timestamp");
|
|
let t2: u64 = ts2.parse().expect("Should be valid timestamp");
|
|
|
|
// Second timestamp should be equal or greater
|
|
assert!(t2 >= t1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_job_wrapper_initialization() {
|
|
let wrapper = JobWrapper::new();
|
|
assert_eq!(wrapper.sequence_number, 0);
|
|
assert!(!wrapper.job_id.is_empty());
|
|
assert!(wrapper.start_time > 0);
|
|
}
|
|
} |