databuild/databuild/metrics_aggregator.rs

507 lines
No EOL
20 KiB
Rust

use crate::{JobLogEntry, log_access::LogReader, metric_templates::{MetricTemplate, ExtractedMetric, MetricType, get_standard_metrics}};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MetricsError {
#[error("Log access error: {0}")]
LogAccess(#[from] crate::log_access::LogAccessError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Too many label combinations for metric {metric}: {count} > {limit}")]
CardinalityLimit { metric: String, count: usize, limit: usize },
}
/// Aggregated metric value with labels
#[derive(Debug, Clone)]
pub struct AggregatedMetric {
pub name: String,
pub help: String,
pub metric_type: MetricType,
pub samples: Vec<MetricSample>,
}
/// Individual metric sample
#[derive(Debug, Clone)]
pub struct MetricSample {
pub labels: HashMap<String, String>,
pub value: f64,
pub timestamp_ms: Option<u64>,
}
/// Configuration for metrics aggregation
#[derive(Debug, Clone)]
pub struct MetricsConfig {
/// Maximum number of unique label combinations per metric (cardinality safety)
pub max_cardinality_per_metric: usize,
/// Time range for metrics collection (in hours from now)
pub time_range_hours: u64,
/// Whether to include job_id in labels (can create high cardinality)
pub include_job_id_labels: bool,
/// Maximum number of jobs to process per metric
pub max_jobs_per_metric: usize,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
max_cardinality_per_metric: 1000, // Prometheus recommended limit
time_range_hours: 24, // Last 24 hours
include_job_id_labels: false, // Disabled by default for cardinality safety
max_jobs_per_metric: 100, // Limit recent jobs
}
}
}
/// Aggregates metrics from job logs with cardinality safety
pub struct MetricsAggregator {
log_reader: LogReader,
config: MetricsConfig,
templates: Vec<MetricTemplate>,
}
impl MetricsAggregator {
/// Create a new metrics aggregator
pub fn new<P: AsRef<Path>>(logs_path: P, config: MetricsConfig) -> Self {
Self {
log_reader: LogReader::new(logs_path),
config,
templates: get_standard_metrics(),
}
}
/// Create with default configuration
pub fn with_defaults<P: AsRef<Path>>(logs_path: P) -> Self {
Self::new(logs_path, MetricsConfig::default())
}
/// Add custom metric template
pub fn add_template(&mut self, template: MetricTemplate) {
self.templates.push(template);
}
/// Aggregate all metrics from recent job logs
pub fn aggregate_metrics(&self) -> Result<Vec<AggregatedMetric>, MetricsError> {
// Get recent job IDs
let job_ids = self.get_recent_job_ids()?;
let mut aggregated: HashMap<String, AggregatedMetric> = HashMap::new();
let mut cardinality_counters: HashMap<String, HashSet<String>> = HashMap::new();
// Process each job's logs
for job_id in job_ids.iter().take(self.config.max_jobs_per_metric) {
if let Ok(entries) = self.get_job_entries(job_id) {
for entry in entries {
self.process_entry(&entry, &mut aggregated, &mut cardinality_counters)?;
}
}
}
Ok(aggregated.into_values().collect())
}
/// Generate Prometheus format output
pub fn to_prometheus_format(&self) -> Result<String, MetricsError> {
let metrics = self.aggregate_metrics()?;
let mut output = String::new();
for metric in metrics {
// Add help comment
output.push_str(&format!("# HELP {} {}\n", metric.name, metric.help));
// Add type comment
let type_str = match metric.metric_type {
MetricType::Counter => "counter",
MetricType::Gauge => "gauge",
MetricType::Histogram => "histogram",
MetricType::Summary => "summary",
};
output.push_str(&format!("# TYPE {} {}\n", metric.name, type_str));
// Add samples
for sample in metric.samples {
output.push_str(&format!("{}{} {}\n",
metric.name,
self.format_labels(&sample.labels),
sample.value
));
}
output.push('\n');
}
Ok(output)
}
/// Get recent job IDs within the configured time range
fn get_recent_job_ids(&self) -> Result<Vec<String>, MetricsError> {
// For now, get all available jobs. In production, this would filter by date
let job_ids = self.log_reader.list_available_jobs(None)?;
Ok(job_ids)
}
/// Get log entries for a specific job
fn get_job_entries(&self, job_id: &str) -> Result<Vec<JobLogEntry>, MetricsError> {
use crate::JobLogsRequest;
let request = JobLogsRequest {
job_run_id: job_id.to_string(),
since_timestamp: 0,
min_level: 0,
limit: 1000, // Get all entries for the job
};
let response = self.log_reader.get_job_logs(&request)?;
Ok(response.entries)
}
/// Process a single log entry through all metric templates
fn process_entry(
&self,
entry: &JobLogEntry,
aggregated: &mut HashMap<String, AggregatedMetric>,
cardinality_counters: &mut HashMap<String, HashSet<String>>,
) -> Result<(), MetricsError> {
for template in &self.templates {
if let Some(mut extracted) = template.extract(entry) {
// Apply cardinality safety filters
if !self.config.include_job_id_labels {
extracted.labels.remove("job_id");
}
// Check cardinality limit
let label_signature = self.get_label_signature(&extracted.labels);
let cardinality_set = cardinality_counters
.entry(extracted.name.clone())
.or_insert_with(HashSet::new);
if cardinality_set.len() >= self.config.max_cardinality_per_metric
&& !cardinality_set.contains(&label_signature) {
// Skip this metric to avoid cardinality explosion
continue;
}
cardinality_set.insert(label_signature);
// Add to aggregated metrics
let agg_metric = aggregated
.entry(extracted.name.clone())
.or_insert_with(|| AggregatedMetric {
name: extracted.name.clone(),
help: extracted.help.clone(),
metric_type: extracted.metric_type.clone(),
samples: Vec::new(),
});
// For counters, sum values with same labels; for gauges, keep latest
let existing_sample = agg_metric.samples.iter_mut()
.find(|s| s.labels == extracted.labels);
if let Some(sample) = existing_sample {
match extracted.metric_type {
MetricType::Counter => {
sample.value += extracted.value; // Sum counters
},
MetricType::Gauge | MetricType::Histogram | MetricType::Summary => {
sample.value = extracted.value; // Replace with latest
},
}
} else {
agg_metric.samples.push(MetricSample {
labels: extracted.labels,
value: extracted.value,
timestamp_ms: None, // Could add timestamp parsing if needed
});
}
}
}
Ok(())
}
/// Generate a signature string for label combinations
fn get_label_signature(&self, labels: &HashMap<String, String>) -> String {
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by_key(|&(k, _)| k);
pairs.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
}
/// Format labels for Prometheus output
fn format_labels(&self, labels: &HashMap<String, String>) -> String {
if labels.is_empty() {
return String::new();
}
let mut pairs: Vec<_> = labels.iter().collect();
pairs.sort_by_key(|&(k, _)| k);
let formatted_pairs: Vec<String> = pairs.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, self.escape_label_value(v)))
.collect();
format!("{{{}}}", formatted_pairs.join(","))
}
/// Escape label values for Prometheus format
fn escape_label_value(&self, value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
.replace('\t', "\\t")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{job_log_entry, PartitionRef, WrapperJobEvent};
use std::io::Write;
use tempfile::TempDir;
fn create_test_logs(temp_dir: &TempDir) -> Result<(), Box<dyn std::error::Error>> {
// Create date directory
let date_dir = temp_dir.path().join("2025-01-27");
std::fs::create_dir_all(&date_dir)?;
// Create test job file with job summary
let job_file = date_dir.join("test_job_123.jsonl");
let mut file = std::fs::File::create(&job_file)?;
let entry = JobLogEntry {
timestamp: "1753763856".to_string(),
job_id: "test_job_123".to_string(),
outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }],
sequence_number: 4,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_summary".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: {
let mut meta = HashMap::new();
meta.insert("runtime_ms".to_string(), "2500.000".to_string());
meta.insert("peak_memory_mb".to_string(), "128.5".to_string());
meta.insert("total_cpu_ms".to_string(), "1200.000".to_string());
meta.insert("exit_code".to_string(), "0".to_string());
meta
},
job_label: None,
})),
};
writeln!(file, "{}", serde_json::to_string(&entry)?)?;
// Create task_success entry
let success_entry = JobLogEntry {
timestamp: "1753763857".to_string(),
job_id: "test_job_123".to_string(),
outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }],
sequence_number: 5,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: HashMap::new(),
job_label: None,
})),
};
writeln!(file, "{}", serde_json::to_string(&success_entry)?)?;
Ok(())
}
#[test]
fn test_metrics_aggregation() {
let temp_dir = TempDir::new().unwrap();
create_test_logs(&temp_dir).unwrap();
let aggregator = MetricsAggregator::with_defaults(temp_dir.path());
let metrics = aggregator.aggregate_metrics().unwrap();
assert!(!metrics.is_empty());
// Find duration metric
let duration_metric = metrics.iter()
.find(|m| m.name == "databuild_job_duration_seconds")
.expect("Should have duration metric");
assert_eq!(duration_metric.samples.len(), 1);
assert_eq!(duration_metric.samples[0].value, 2.5); // 2500ms -> 2.5s
// Verify labels - should only have job_id (which gets excluded) and job_status
let labels = &duration_metric.samples[0].labels;
assert_eq!(labels.get("job_status").unwrap(), "JOB_COMPLETED");
assert!(!labels.contains_key("job_id")); // Should be excluded by default
// Note: job_label would only be available from manifest entries, not job_summary events
}
#[test]
fn test_prometheus_format() {
let temp_dir = TempDir::new().unwrap();
create_test_logs(&temp_dir).unwrap();
let aggregator = MetricsAggregator::with_defaults(temp_dir.path());
let prometheus_output = aggregator.to_prometheus_format().unwrap();
assert!(prometheus_output.contains("# HELP databuild_job_duration_seconds"));
assert!(prometheus_output.contains("# TYPE databuild_job_duration_seconds histogram"));
assert!(prometheus_output.contains("databuild_job_duration_seconds{"));
assert!(prometheus_output.contains("job_status=\"JOB_COMPLETED\""));
assert!(prometheus_output.contains("} 2.5"));
}
#[test]
fn test_cardinality_safety() {
let config = MetricsConfig {
max_cardinality_per_metric: 2, // Very low limit for testing
time_range_hours: 24,
include_job_id_labels: true, // Enable to test cardinality
max_jobs_per_metric: 100,
};
let temp_dir = TempDir::new().unwrap();
// Create multiple jobs to test cardinality limit
let date_dir = temp_dir.path().join("2025-01-27");
std::fs::create_dir_all(&date_dir).unwrap();
for i in 1..=5 {
let job_file = date_dir.join(format!("job_{}.jsonl", i));
let mut file = std::fs::File::create(&job_file).unwrap();
let entry = JobLogEntry {
timestamp: "1753763856".to_string(),
job_id: format!("job_{}", i),
outputs: vec![PartitionRef { r#str: format!("table_{}/date=2025-01-27", i) }],
sequence_number: 1,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: HashMap::new(),
job_label: None,
})),
};
writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
}
let aggregator = MetricsAggregator::new(temp_dir.path(), config);
let metrics = aggregator.aggregate_metrics().unwrap();
// Find the success count metric
let success_metric = metrics.iter()
.find(|m| m.name == "databuild_job_events_total")
.expect("Should have success count metric");
// Should be limited by cardinality (max 2 unique label combinations)
assert!(success_metric.samples.len() <= 2,
"Expected <= 2 samples due to cardinality limit, got {}",
success_metric.samples.len());
}
#[test]
fn test_label_escaping() {
let aggregator = MetricsAggregator::with_defaults("/tmp");
assert_eq!(aggregator.escape_label_value("normal"), "normal");
assert_eq!(aggregator.escape_label_value("with\"quotes"), "with\\\"quotes");
assert_eq!(aggregator.escape_label_value("with\\backslash"), "with\\\\backslash");
assert_eq!(aggregator.escape_label_value("with\nnewline"), "with\\nnewline");
assert_eq!(aggregator.escape_label_value("with\ttab"), "with\\ttab");
}
#[test]
fn test_label_signature_generation() {
let aggregator = MetricsAggregator::with_defaults("/tmp");
let mut labels1 = HashMap::new();
labels1.insert("job_label".to_string(), "test_job".to_string());
labels1.insert("job_status".to_string(), "JOB_COMPLETED".to_string());
let mut labels2 = HashMap::new();
labels2.insert("job_status".to_string(), "JOB_COMPLETED".to_string());
labels2.insert("job_label".to_string(), "test_job".to_string());
// Order shouldn't matter
assert_eq!(
aggregator.get_label_signature(&labels1),
aggregator.get_label_signature(&labels2)
);
let signature = aggregator.get_label_signature(&labels1);
assert!(signature.contains("job_label=test_job"));
assert!(signature.contains("job_status=JOB_COMPLETED"));
}
#[test]
fn test_counter_vs_gauge_aggregation() {
let temp_dir = TempDir::new().unwrap();
let date_dir = temp_dir.path().join("2025-01-27");
std::fs::create_dir_all(&date_dir).unwrap();
let job_file = date_dir.join("test_job.jsonl");
let mut file = std::fs::File::create(&job_file).unwrap();
// Create multiple task_success events (should be summed as counter)
for i in 1..=3 {
let entry = JobLogEntry {
timestamp: format!("175376385{}", i),
job_id: "test_job".to_string(),
outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }],
sequence_number: i,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: HashMap::new(),
job_label: None,
})),
};
writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
}
// Create job summaries with different memory values (should use latest as gauge)
for (i, memory) in ["100.0", "150.0", "120.0"].iter().enumerate() {
let entry = JobLogEntry {
timestamp: format!("175376386{}", i),
job_id: "test_job".to_string(),
outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }],
sequence_number: (i + 10) as u64,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_summary".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: {
let mut meta = HashMap::new();
meta.insert("peak_memory_mb".to_string(), memory.to_string());
meta.insert("runtime_ms".to_string(), "1000".to_string());
meta.insert("total_cpu_ms".to_string(), "500".to_string());
meta
},
job_label: None,
})),
};
writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap();
}
let aggregator = MetricsAggregator::with_defaults(temp_dir.path());
let metrics = aggregator.aggregate_metrics().unwrap();
// Check counter behavior (task_success events should be summed)
let success_metric = metrics.iter()
.find(|m| m.name == "databuild_job_events_total")
.expect("Should have success count metric");
assert_eq!(success_metric.samples[0].value, 3.0); // 3 events summed
// Check gauge behavior (memory should be latest value)
let memory_metric = metrics.iter()
.find(|m| m.name == "databuild_job_peak_memory_mb")
.expect("Should have memory metric");
assert_eq!(memory_metric.samples[0].value, 120.0); // Latest value
}
}