348 lines
No EOL
14 KiB
Rust
348 lines
No EOL
14 KiB
Rust
use crate::{JobLogEntry, job_log_entry};
|
|
use serde_json;
|
|
use std::collections::HashMap;
|
|
use std::fs::{self, File, OpenOptions};
|
|
use std::io::{BufRead, Write};
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
use thiserror::Error;
|
|
|
|
#[derive(Error, Debug)]
|
|
pub enum LogCollectorError {
|
|
#[error("IO error: {0}")]
|
|
Io(#[from] std::io::Error),
|
|
#[error("JSON parsing error: {0}")]
|
|
Json(#[from] serde_json::Error),
|
|
#[error("Invalid log entry: {0}")]
|
|
InvalidLogEntry(String),
|
|
}
|
|
|
|
pub struct LogCollector {
|
|
logs_dir: PathBuf,
|
|
active_files: HashMap<String, File>,
|
|
job_label_mapping: HashMap<String, String>, // job_run_id -> job_label
|
|
}
|
|
|
|
impl LogCollector {
|
|
pub fn new<P: AsRef<Path>>(logs_dir: P) -> Result<Self, LogCollectorError> {
|
|
let logs_dir = logs_dir.as_ref().to_path_buf();
|
|
|
|
// Ensure the base logs directory exists
|
|
if !logs_dir.exists() {
|
|
fs::create_dir_all(&logs_dir)?;
|
|
}
|
|
|
|
Ok(Self {
|
|
logs_dir,
|
|
active_files: HashMap::new(),
|
|
job_label_mapping: HashMap::new(),
|
|
})
|
|
}
|
|
|
|
/// Set the job label for a specific job run ID
|
|
pub fn set_job_label(&mut self, job_run_id: &str, job_label: &str) {
|
|
self.job_label_mapping.insert(job_run_id.to_string(), job_label.to_string());
|
|
}
|
|
|
|
/// Get the default logs directory based on environment variable or fallback
|
|
pub fn default_logs_dir() -> PathBuf {
|
|
std::env::var("DATABUILD_LOGS_DIR")
|
|
.map(PathBuf::from)
|
|
.unwrap_or_else(|_| {
|
|
// Fallback to ./logs/databuild for safety - avoid system directories
|
|
std::env::current_dir()
|
|
.unwrap_or_else(|_| PathBuf::from("."))
|
|
.join("logs")
|
|
.join("databuild")
|
|
})
|
|
}
|
|
|
|
/// Create a date-organized directory path for today
|
|
fn get_date_directory(&self) -> Result<PathBuf, LogCollectorError> {
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.map_err(|e| LogCollectorError::InvalidLogEntry(format!("System time error: {}", e)))?;
|
|
|
|
let timestamp = now.as_secs();
|
|
let datetime = chrono::DateTime::from_timestamp(timestamp as i64, 0)
|
|
.ok_or_else(|| LogCollectorError::InvalidLogEntry("Invalid timestamp".to_string()))?;
|
|
|
|
let date_str = datetime.format("%Y-%m-%d").to_string();
|
|
let date_dir = self.logs_dir.join(date_str);
|
|
|
|
// Ensure the date directory exists
|
|
if !date_dir.exists() {
|
|
fs::create_dir_all(&date_dir)?;
|
|
}
|
|
|
|
Ok(date_dir)
|
|
}
|
|
|
|
/// Get or create a file handle for a specific job run
|
|
fn get_job_file(&mut self, job_run_id: &str) -> Result<&mut File, LogCollectorError> {
|
|
if !self.active_files.contains_key(job_run_id) {
|
|
let date_dir = self.get_date_directory()?;
|
|
let file_path = date_dir.join(format!("{}.jsonl", job_run_id));
|
|
|
|
let file = OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&file_path)?;
|
|
|
|
self.active_files.insert(job_run_id.to_string(), file);
|
|
}
|
|
|
|
Ok(self.active_files.get_mut(job_run_id).unwrap())
|
|
}
|
|
|
|
/// Write a single log entry to the appropriate JSONL file
|
|
pub fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<(), LogCollectorError> {
|
|
let file = self.get_job_file(job_run_id)?;
|
|
let json_line = serde_json::to_string(entry)?;
|
|
writeln!(file, "{}", json_line)?;
|
|
file.flush()?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Consume stdout from a job process and parse/store log entries
|
|
pub fn consume_job_output<R: BufRead>(&mut self, job_run_id: &str, reader: R) -> Result<(), LogCollectorError> {
|
|
for line in reader.lines() {
|
|
let line = line?;
|
|
|
|
// Skip empty lines
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Try to parse as JobLogEntry
|
|
match serde_json::from_str::<JobLogEntry>(&line) {
|
|
Ok(mut entry) => {
|
|
// Validate that the job_id matches
|
|
if entry.job_id != job_run_id {
|
|
return Err(LogCollectorError::InvalidLogEntry(
|
|
format!("Job ID mismatch: expected {}, got {}", job_run_id, entry.job_id)
|
|
));
|
|
}
|
|
|
|
// Enrich WrapperJobEvent and Manifest with job_label if available
|
|
if let Some(job_label) = self.job_label_mapping.get(job_run_id) {
|
|
match &mut entry.content {
|
|
Some(job_log_entry::Content::JobEvent(ref mut job_event)) => {
|
|
job_event.job_label = Some(job_label.clone());
|
|
}
|
|
Some(job_log_entry::Content::Manifest(ref mut manifest)) => {
|
|
if let Some(ref mut task) = manifest.task {
|
|
if let Some(ref mut job) = task.job {
|
|
job.label = job_label.clone();
|
|
}
|
|
}
|
|
}
|
|
_ => {} // No enrichment needed for Log entries
|
|
}
|
|
}
|
|
|
|
self.write_log_entry(job_run_id, &entry)?;
|
|
}
|
|
Err(_) => {
|
|
// If it's not a JobLogEntry, treat it as raw output and create a log entry
|
|
let raw_entry = JobLogEntry {
|
|
timestamp: SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs()
|
|
.to_string(),
|
|
job_id: job_run_id.to_string(),
|
|
outputs: vec![], // Raw output doesn't have specific outputs
|
|
sequence_number: 0, // Raw output gets sequence 0
|
|
content: Some(crate::job_log_entry::Content::Log(crate::LogMessage {
|
|
level: crate::log_message::LogLevel::Info as i32,
|
|
message: line,
|
|
fields: HashMap::new(),
|
|
})),
|
|
};
|
|
|
|
self.write_log_entry(job_run_id, &raw_entry)?;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Close and flush all active files
|
|
pub fn close_all(&mut self) -> Result<(), LogCollectorError> {
|
|
for (_, mut file) in self.active_files.drain() {
|
|
file.flush()?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Close and flush a specific job's file
|
|
pub fn close_job(&mut self, job_run_id: &str) -> Result<(), LogCollectorError> {
|
|
if let Some(mut file) = self.active_files.remove(job_run_id) {
|
|
file.flush()?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::{job_log_entry, log_message, LogMessage, PartitionRef};
|
|
use std::io::Cursor;
|
|
use tempfile::TempDir;
|
|
|
|
fn create_test_log_entry(job_id: &str, sequence: u64) -> JobLogEntry {
|
|
JobLogEntry {
|
|
timestamp: "1234567890".to_string(),
|
|
job_id: job_id.to_string(),
|
|
outputs: vec![PartitionRef { r#str: "test/partition".to_string() }],
|
|
sequence_number: sequence,
|
|
content: Some(job_log_entry::Content::Log(LogMessage {
|
|
level: log_message::LogLevel::Info as i32,
|
|
message: "Test log message".to_string(),
|
|
fields: HashMap::new(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_log_collector_creation() {
|
|
let temp_dir = TempDir::new().unwrap();
|
|
let collector = LogCollector::new(temp_dir.path()).unwrap();
|
|
|
|
assert_eq!(collector.logs_dir, temp_dir.path());
|
|
assert!(collector.active_files.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_write_single_log_entry() {
|
|
let temp_dir = TempDir::new().unwrap();
|
|
let mut collector = LogCollector::new(temp_dir.path()).unwrap();
|
|
|
|
let entry = create_test_log_entry("job_123", 1);
|
|
collector.write_log_entry("job_123", &entry).unwrap();
|
|
|
|
// Verify file was created and contains the entry
|
|
collector.close_all().unwrap();
|
|
|
|
// Check that a date directory was created
|
|
let date_dirs: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
|
|
assert_eq!(date_dirs.len(), 1);
|
|
|
|
// Check that the job file exists in the date directory
|
|
let date_dir_path = date_dirs[0].as_ref().unwrap().path();
|
|
let job_files: Vec<_> = fs::read_dir(&date_dir_path).unwrap().collect();
|
|
assert_eq!(job_files.len(), 1);
|
|
|
|
let job_file_path = job_files[0].as_ref().unwrap().path();
|
|
assert!(job_file_path.file_name().unwrap().to_string_lossy().contains("job_123"));
|
|
|
|
// Verify content
|
|
let content = fs::read_to_string(&job_file_path).unwrap();
|
|
assert!(content.contains("Test log message"));
|
|
assert!(content.contains("\"sequence_number\":1"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_consume_structured_output() {
|
|
let temp_dir = TempDir::new().unwrap();
|
|
let mut collector = LogCollector::new(temp_dir.path()).unwrap();
|
|
|
|
let entry1 = create_test_log_entry("job_456", 1);
|
|
let entry2 = create_test_log_entry("job_456", 2);
|
|
|
|
let input = format!("{}\n{}\n",
|
|
serde_json::to_string(&entry1).unwrap(),
|
|
serde_json::to_string(&entry2).unwrap()
|
|
);
|
|
|
|
let reader = Cursor::new(input);
|
|
collector.consume_job_output("job_456", reader).unwrap();
|
|
collector.close_all().unwrap();
|
|
|
|
// Verify both entries were written
|
|
let date_dirs: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
|
|
let date_dir_path = date_dirs[0].as_ref().unwrap().path();
|
|
let job_files: Vec<_> = fs::read_dir(&date_dir_path).unwrap().collect();
|
|
let job_file_path = job_files[0].as_ref().unwrap().path();
|
|
|
|
let content = fs::read_to_string(&job_file_path).unwrap();
|
|
let lines: Vec<&str> = content.trim().split('\n').collect();
|
|
assert_eq!(lines.len(), 2);
|
|
|
|
// Verify both entries can be parsed back
|
|
let parsed1: JobLogEntry = serde_json::from_str(lines[0]).unwrap();
|
|
let parsed2: JobLogEntry = serde_json::from_str(lines[1]).unwrap();
|
|
assert_eq!(parsed1.sequence_number, 1);
|
|
assert_eq!(parsed2.sequence_number, 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_consume_mixed_output() {
|
|
let temp_dir = TempDir::new().unwrap();
|
|
let mut collector = LogCollector::new(temp_dir.path()).unwrap();
|
|
|
|
let entry = create_test_log_entry("job_789", 1);
|
|
let structured_line = serde_json::to_string(&entry).unwrap();
|
|
|
|
let input = format!("{}\nRaw output line\nAnother raw line\n", structured_line);
|
|
|
|
let reader = Cursor::new(input);
|
|
collector.consume_job_output("job_789", reader).unwrap();
|
|
collector.close_all().unwrap();
|
|
|
|
// Verify all lines were captured (1 structured + 2 raw)
|
|
let date_dirs: Vec<_> = fs::read_dir(temp_dir.path()).unwrap().collect();
|
|
let date_dir_path = date_dirs[0].as_ref().unwrap().path();
|
|
let job_files: Vec<_> = fs::read_dir(&date_dir_path).unwrap().collect();
|
|
let job_file_path = job_files[0].as_ref().unwrap().path();
|
|
|
|
let content = fs::read_to_string(&job_file_path).unwrap();
|
|
let lines: Vec<&str> = content.trim().split('\n').collect();
|
|
assert_eq!(lines.len(), 3);
|
|
|
|
// First line should be the structured entry
|
|
let parsed1: JobLogEntry = serde_json::from_str(lines[0]).unwrap();
|
|
assert_eq!(parsed1.sequence_number, 1);
|
|
|
|
// Second and third lines should be raw output entries
|
|
let parsed2: JobLogEntry = serde_json::from_str(lines[1]).unwrap();
|
|
let parsed3: JobLogEntry = serde_json::from_str(lines[2]).unwrap();
|
|
assert_eq!(parsed2.sequence_number, 0); // Raw output gets sequence 0
|
|
assert_eq!(parsed3.sequence_number, 0);
|
|
|
|
if let Some(job_log_entry::Content::Log(log_msg)) = &parsed2.content {
|
|
assert_eq!(log_msg.message, "Raw output line");
|
|
} else {
|
|
panic!("Expected log content");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_default_logs_dir() {
|
|
let default_dir = LogCollector::default_logs_dir();
|
|
|
|
// Should be a valid path
|
|
assert!(default_dir.is_absolute() || default_dir.starts_with("."));
|
|
assert!(default_dir.to_string_lossy().contains("logs"));
|
|
assert!(default_dir.to_string_lossy().contains("databuild"));
|
|
}
|
|
|
|
#[test]
|
|
fn test_job_id_validation() {
|
|
let temp_dir = TempDir::new().unwrap();
|
|
let mut collector = LogCollector::new(temp_dir.path()).unwrap();
|
|
|
|
let mut entry = create_test_log_entry("wrong_job_id", 1);
|
|
entry.job_id = "wrong_job_id".to_string();
|
|
|
|
let input = serde_json::to_string(&entry).unwrap();
|
|
let reader = Cursor::new(input);
|
|
|
|
let result = collector.consume_job_output("expected_job_id", reader);
|
|
assert!(result.is_err());
|
|
assert!(result.unwrap_err().to_string().contains("Job ID mismatch"));
|
|
}
|
|
} |