From 216b5f5fb2ee22e9f6d4070d6798b8283bf23d91 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 28 Jul 2025 22:23:22 -0700 Subject: [PATCH] Implement log/metric serving --- databuild/BUILD.bazel | 2 + databuild/databuild.proto | 1 + databuild/graph/execute.rs | 6 +- databuild/job/main.rs | 10 + databuild/lib.rs | 6 + databuild/log_collector.rs | 32 +- databuild/metric_templates.rs | 523 ++++++++++++++++++++++++++++++++ databuild/metrics_aggregator.rs | 507 +++++++++++++++++++++++++++++++ databuild/service/handlers.rs | 173 +++++++++++ databuild/service/mod.rs | 5 + 10 files changed, 1260 insertions(+), 5 deletions(-) create mode 100644 databuild/metric_templates.rs create mode 100644 databuild/metrics_aggregator.rs diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index a6143ed..1c8a9d6 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -45,6 +45,8 @@ rust_library( "lib.rs", "log_access.rs", "log_collector.rs", + "metric_templates.rs", + "metrics_aggregator.rs", "mermaid_utils.rs", "orchestration/error.rs", "orchestration/events.rs", diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 36a47ee..71771cb 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -322,6 +322,7 @@ message WrapperJobEvent { map metadata = 2; optional string job_status = 3; // JobStatus enum as string optional int32 exit_code = 4; + optional string job_label = 5; // Job label for low-cardinality metrics } /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/databuild/graph/execute.rs b/databuild/graph/execute.rs index 6150732..37a988e 100644 --- a/databuild/graph/execute.rs +++ b/databuild/graph/execute.rs @@ -186,7 +186,11 @@ fn worker( // Initialize log collector let mut log_collector = match LogCollector::new(LogCollector::default_logs_dir()) { - Ok(collector) => collector, + Ok(mut collector) => { + // Set the job label mapping for this job run + collector.set_job_label(&job_run_id, &task.job.as_ref().unwrap().label); + collector + }, Err(e) => { let err_msg = format!("[Worker {}] Failed to initialize log collector for {}: {}", worker_id, task.job.as_ref().unwrap().label, e); diff --git a/databuild/job/main.rs b/databuild/job/main.rs index bb12e5b..03feb52 100644 --- a/databuild/job/main.rs +++ b/databuild/job/main.rs @@ -110,6 +110,7 @@ impl JobWrapper { metadata: std::collections::HashMap::new(), job_status: None, exit_code: None, + job_label: None, // Will be enriched by LogCollector }), ); @@ -164,6 +165,7 @@ impl JobWrapper { job_status: None, exit_code: None, metadata: std::collections::HashMap::new(), + job_label: None, // Will be enriched by LogCollector }), ); @@ -175,6 +177,7 @@ impl JobWrapper { job_status: None, exit_code: None, metadata: std::collections::HashMap::new(), + job_label: None, // Will be enriched by LogCollector }), ); @@ -256,6 +259,7 @@ impl JobWrapper { job_status: None, exit_code: None, metadata, + job_label: None, // Will be enriched by LogCollector })), }; @@ -419,6 +423,7 @@ impl JobWrapper { job_status: None, exit_code: Some(exit_code), metadata: summary_metadata, + job_label: None, // Will be enriched by LogCollector }), ); @@ -431,6 +436,7 @@ impl JobWrapper { job_status: Some("JOB_COMPLETED".to_string()), exit_code: Some(exit_code), metadata: std::collections::HashMap::new(), + job_label: None, // Will be enriched by LogCollector }), ); @@ -466,6 +472,7 @@ impl JobWrapper { job_status: Some("JOB_FAILED".to_string()), exit_code: Some(exit_code), metadata: std::collections::HashMap::new(), + job_label: None, // Will be enriched by LogCollector }), ); @@ -484,6 +491,7 @@ impl JobWrapper { ); meta }, + job_label: None, // Will be enriched by LogCollector }), ); } @@ -655,6 +663,7 @@ mod tests { job_status: Some("JOB_COMPLETED".to_string()), exit_code: Some(0), metadata: std::collections::HashMap::new(), + job_label: None, }; assert_eq!(event.event_type, "task_success"); assert_eq!(event.job_status, Some("JOB_COMPLETED".to_string())); @@ -666,6 +675,7 @@ mod tests { job_status: Some("JOB_FAILED".to_string()), exit_code: Some(1), metadata: std::collections::HashMap::new(), + job_label: None, }; assert_eq!(event.event_type, "task_failed"); assert_eq!(event.job_status, Some("JOB_FAILED".to_string())); diff --git a/databuild/lib.rs b/databuild/lib.rs index 8897f38..659c842 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -24,6 +24,12 @@ pub mod log_collector; // Log access module pub mod log_access; +// Metric templates module +pub mod metric_templates; + +// Metrics aggregator module +pub mod metrics_aggregator; + // Format consistency tests #[cfg(test)] mod format_consistency_test; diff --git a/databuild/log_collector.rs b/databuild/log_collector.rs index d512f04..95a34c3 100644 --- a/databuild/log_collector.rs +++ b/databuild/log_collector.rs @@ -1,8 +1,8 @@ -use crate::JobLogEntry; +use crate::{JobLogEntry, job_log_entry}; use serde_json; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, Write}; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; use thiserror::Error; @@ -20,6 +20,7 @@ pub enum LogCollectorError { pub struct LogCollector { logs_dir: PathBuf, active_files: HashMap, + job_label_mapping: HashMap, // job_run_id -> job_label } impl LogCollector { @@ -34,9 +35,15 @@ impl LogCollector { Ok(Self { logs_dir, active_files: HashMap::new(), + job_label_mapping: HashMap::new(), }) } + /// Set the job label for a specific job run ID + pub fn set_job_label(&mut self, job_run_id: &str, job_label: &str) { + self.job_label_mapping.insert(job_run_id.to_string(), job_label.to_string()); + } + /// Get the default logs directory based on environment variable or fallback pub fn default_logs_dir() -> PathBuf { std::env::var("DATABUILD_LOGS_DIR") @@ -109,7 +116,7 @@ impl LogCollector { // Try to parse as JobLogEntry match serde_json::from_str::(&line) { - Ok(entry) => { + Ok(mut entry) => { // Validate that the job_id matches if entry.job_id != job_run_id { return Err(LogCollectorError::InvalidLogEntry( @@ -117,6 +124,23 @@ impl LogCollector { )); } + // Enrich WrapperJobEvent and Manifest with job_label if available + if let Some(job_label) = self.job_label_mapping.get(job_run_id) { + match &mut entry.content { + Some(job_log_entry::Content::JobEvent(ref mut job_event)) => { + job_event.job_label = Some(job_label.clone()); + } + Some(job_log_entry::Content::Manifest(ref mut manifest)) => { + if let Some(ref mut task) = manifest.task { + if let Some(ref mut job) = task.job { + job.label = job_label.clone(); + } + } + } + _ => {} // No enrichment needed for Log entries + } + } + self.write_log_entry(job_run_id, &entry)?; } Err(_) => { @@ -165,7 +189,7 @@ impl LogCollector { #[cfg(test)] mod tests { use super::*; - use crate::{job_log_entry, log_message, LogMessage, PartitionRef, WrapperJobEvent}; + use crate::{job_log_entry, log_message, LogMessage, PartitionRef}; use std::io::Cursor; use tempfile::TempDir; diff --git a/databuild/metric_templates.rs b/databuild/metric_templates.rs new file mode 100644 index 0000000..139f34a --- /dev/null +++ b/databuild/metric_templates.rs @@ -0,0 +1,523 @@ +use crate::{JobLogEntry, job_log_entry, WrapperJobEvent}; +use std::collections::HashMap; + +/// Template for metric extraction from job events +#[derive(Debug, Clone)] +pub struct MetricTemplate { + pub name: String, + pub help: String, + pub metric_type: MetricType, + pub extractor: MetricExtractor, + pub labels: Vec, // Static label names for this metric +} + +/// Prometheus metric types +#[derive(Debug, Clone)] +pub enum MetricType { + Counter, + Gauge, + Histogram, + Summary, +} + +/// Strategy for extracting metric values from job events +#[derive(Debug, Clone)] +pub enum MetricExtractor { + /// Extract from job event metadata by key + EventMetadata { + event_type: String, + metadata_key: String, + /// Optional conversion function name for non-numeric values + converter: Option, + }, + /// Count occurrences of specific event types + EventCount { + event_type: String, + }, + /// Extract job duration from start/end events + JobDuration, + /// Extract peak memory from job summary + PeakMemory, + /// Extract total CPU time from job summary + TotalCpuTime, + /// Extract exit code from job events + ExitCode, +} + +/// Converters for non-numeric metadata values +#[derive(Debug, Clone)] +pub enum MetricConverter { + /// Convert boolean strings to 0/1 + BoolToFloat, + /// Convert status strings to numeric codes + StatusToCode(HashMap), + /// Parse duration strings like "123ms" to seconds + DurationToSeconds, +} + +/// Result of metric extraction +#[derive(Debug)] +pub struct ExtractedMetric { + pub name: String, + pub value: f64, + pub labels: HashMap, + pub help: String, + pub metric_type: MetricType, +} + +impl MetricTemplate { + /// Extract a metric from a job log entry if applicable + pub fn extract(&self, entry: &JobLogEntry) -> Option { + let value = match &self.extractor { + MetricExtractor::EventMetadata { event_type, metadata_key, converter } => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if event.event_type == *event_type { + if let Some(raw_value) = event.metadata.get(metadata_key) { + self.convert_value(raw_value, converter)? + } else { + return None; + } + } else { + return None; + } + } else { + return None; + } + }, + MetricExtractor::EventCount { event_type } => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if event.event_type == *event_type { + 1.0 + } else { + return None; + } + } else { + return None; + } + }, + MetricExtractor::JobDuration => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if event.event_type == "job_summary" { + if let Some(runtime_str) = event.metadata.get("runtime_ms") { + runtime_str.parse::().ok()? / 1000.0 // Convert to seconds + } else { + return None; + } + } else { + return None; + } + } else { + return None; + } + }, + MetricExtractor::PeakMemory => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if event.event_type == "job_summary" { + if let Some(memory_str) = event.metadata.get("peak_memory_mb") { + memory_str.parse::().ok()? + } else { + return None; + } + } else { + return None; + } + } else { + return None; + } + }, + MetricExtractor::TotalCpuTime => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if event.event_type == "job_summary" { + if let Some(cpu_str) = event.metadata.get("total_cpu_ms") { + cpu_str.parse::().ok()? / 1000.0 // Convert to seconds + } else { + return None; + } + } else { + return None; + } + } else { + return None; + } + }, + MetricExtractor::ExitCode => { + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if let Some(exit_code) = event.exit_code { + exit_code as f64 + } else { + return None; + } + } else { + return None; + } + }, + }; + + // Generate labels for this metric + let mut labels = HashMap::new(); + + // Always include job_id as a label (but this is excluded by default for cardinality safety) + labels.insert("job_id".to_string(), entry.job_id.clone()); + + // Extract job label from manifest if available - this is the low-cardinality identifier + if let Some(job_log_entry::Content::Manifest(manifest)) = &entry.content { + if let Some(task) = &manifest.task { + if let Some(job) = &task.job { + labels.insert("job_label".to_string(), job.label.clone()); + } + } + } + + // Add job status and job label if available from job events + if let Some(job_log_entry::Content::JobEvent(event)) = &entry.content { + if let Some(job_status) = &event.job_status { + labels.insert("job_status".to_string(), job_status.clone()); + } + if let Some(job_label) = &event.job_label { + labels.insert("job_label".to_string(), job_label.clone()); + } + } + + Some(ExtractedMetric { + name: self.name.clone(), + value, + labels, + help: self.help.clone(), + metric_type: self.metric_type.clone(), + }) + } + + fn convert_value(&self, raw_value: &str, converter: &Option) -> Option { + match converter { + None => raw_value.parse().ok(), + Some(MetricConverter::BoolToFloat) => { + match raw_value.to_lowercase().as_str() { + "true" | "1" | "yes" => Some(1.0), + "false" | "0" | "no" => Some(0.0), + _ => None, + } + }, + Some(MetricConverter::StatusToCode(mapping)) => { + mapping.get(raw_value).copied() + }, + Some(MetricConverter::DurationToSeconds) => { + // Parse formats like "123ms", "45s", "2.5m" + if raw_value.ends_with("ms") { + raw_value.trim_end_matches("ms").parse::().ok().map(|v| v / 1000.0) + } else if raw_value.ends_with("s") { + raw_value.trim_end_matches("s").parse::().ok() + } else if raw_value.ends_with("m") { + raw_value.trim_end_matches("m").parse::().ok().map(|v| v * 60.0) + } else { + raw_value.parse::().ok() + } + }, + } + } +} + + +/// Get standard DataBuild metric templates +pub fn get_standard_metrics() -> Vec { + vec![ + // Job execution metrics + MetricTemplate { + name: "databuild_job_duration_seconds".to_string(), + help: "Duration of job execution in seconds".to_string(), + metric_type: MetricType::Histogram, + extractor: MetricExtractor::JobDuration, + labels: vec!["job_label".to_string()], + }, + MetricTemplate { + name: "databuild_job_peak_memory_mb".to_string(), + help: "Peak memory usage of job in megabytes".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::PeakMemory, + labels: vec!["job_label".to_string()], + }, + MetricTemplate { + name: "databuild_job_cpu_time_seconds".to_string(), + help: "Total CPU time consumed by job in seconds".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::TotalCpuTime, + labels: vec!["job_label".to_string()], + }, + MetricTemplate { + name: "databuild_job_exit_code".to_string(), + help: "Exit code of job execution".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::ExitCode, + labels: vec!["job_label".to_string(), "job_status".to_string()], + }, + + // Job event counters + MetricTemplate { + name: "databuild_job_events_total".to_string(), + help: "Total number of job events".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::EventCount { event_type: "task_success".to_string() }, + labels: vec!["job_label".to_string()], + }, + MetricTemplate { + name: "databuild_job_failures_total".to_string(), + help: "Total number of job failures".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::EventCount { event_type: "task_failed".to_string() }, + labels: vec!["job_label".to_string()], + }, + MetricTemplate { + name: "databuild_heartbeats_total".to_string(), + help: "Total number of heartbeat events".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::EventCount { event_type: "heartbeat".to_string() }, + labels: vec!["job_label".to_string()], + }, + ] +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{PartitionRef, log_message, LogMessage}; + + fn create_test_job_summary_entry(job_id: &str, runtime_ms: &str, memory_mb: &str, cpu_ms: &str, exit_code: i32) -> JobLogEntry { + let mut metadata = HashMap::new(); + metadata.insert("runtime_ms".to_string(), runtime_ms.to_string()); + metadata.insert("peak_memory_mb".to_string(), memory_mb.to_string()); + metadata.insert("total_cpu_ms".to_string(), cpu_ms.to_string()); + metadata.insert("exit_code".to_string(), exit_code.to_string()); + + JobLogEntry { + timestamp: "1234567890".to_string(), + job_id: job_id.to_string(), + outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }], + sequence_number: 1, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_summary".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(exit_code), + metadata, + job_label: None, + })), + } + } + + fn create_test_task_success_entry(job_id: &str) -> JobLogEntry { + JobLogEntry { + timestamp: "1234567890".to_string(), + job_id: job_id.to_string(), + outputs: vec![PartitionRef { r#str: "podcasts/date=2025-01-27".to_string() }], + sequence_number: 2, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: HashMap::new(), + job_label: None, + })), + } + } + + #[test] + fn test_job_duration_extraction() { + let template = MetricTemplate { + name: "test_duration".to_string(), + help: "Test duration".to_string(), + metric_type: MetricType::Histogram, + extractor: MetricExtractor::JobDuration, + labels: vec![], + }; + + let entry = create_test_job_summary_entry("test-job", "2500", "64.5", "1200", 0); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.name, "test_duration"); + assert_eq!(metric.value, 2.5); // 2500ms -> 2.5s + assert_eq!(metric.labels.get("job_id").unwrap(), "test-job"); + // Note: job_label would only be available from manifest entries, not job_summary + } + + #[test] + fn test_memory_extraction() { + let template = MetricTemplate { + name: "test_memory".to_string(), + help: "Test memory".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::PeakMemory, + labels: vec![], + }; + + let entry = create_test_job_summary_entry("test-job", "2500", "128.75", "1200", 0); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.value, 128.75); + } + + #[test] + fn test_cpu_time_extraction() { + let template = MetricTemplate { + name: "test_cpu".to_string(), + help: "Test CPU".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::TotalCpuTime, + labels: vec![], + }; + + let entry = create_test_job_summary_entry("test-job", "2500", "64.5", "1500", 0); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.value, 1.5); // 1500ms -> 1.5s + } + + #[test] + fn test_exit_code_extraction() { + let template = MetricTemplate { + name: "test_exit_code".to_string(), + help: "Test exit code".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::ExitCode, + labels: vec![], + }; + + let entry = create_test_job_summary_entry("test-job", "2500", "64.5", "1200", 42); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.value, 42.0); + assert_eq!(metric.labels.get("job_status").unwrap(), "JOB_COMPLETED"); + } + + #[test] + fn test_event_count_extraction() { + let template = MetricTemplate { + name: "test_success_count".to_string(), + help: "Test success count".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::EventCount { event_type: "task_success".to_string() }, + labels: vec![], + }; + + let entry = create_test_task_success_entry("test-job"); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.value, 1.0); + // Note: job_label would only be available from manifest entries, not job events + } + + #[test] + fn test_event_metadata_extraction() { + let template = MetricTemplate { + name: "test_runtime".to_string(), + help: "Test runtime from metadata".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::EventMetadata { + event_type: "job_summary".to_string(), + metadata_key: "runtime_ms".to_string(), + converter: None, + }, + labels: vec![], + }; + + let entry = create_test_job_summary_entry("test-job", "3000", "64.5", "1200", 0); + let metric = template.extract(&entry).unwrap(); + + assert_eq!(metric.value, 3000.0); + } + + + #[test] + fn test_bool_converter() { + let template = MetricTemplate { + name: "test_bool".to_string(), + help: "Test bool".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::EventMetadata { + event_type: "test_event".to_string(), + metadata_key: "success".to_string(), + converter: Some(MetricConverter::BoolToFloat), + }, + labels: vec![], + }; + + assert_eq!(template.convert_value("true", &Some(MetricConverter::BoolToFloat)), Some(1.0)); + assert_eq!(template.convert_value("false", &Some(MetricConverter::BoolToFloat)), Some(0.0)); + assert_eq!(template.convert_value("yes", &Some(MetricConverter::BoolToFloat)), Some(1.0)); + assert_eq!(template.convert_value("no", &Some(MetricConverter::BoolToFloat)), Some(0.0)); + assert_eq!(template.convert_value("invalid", &Some(MetricConverter::BoolToFloat)), None); + } + + #[test] + fn test_duration_converter() { + let template = MetricTemplate { + name: "test_duration".to_string(), + help: "Test duration".to_string(), + metric_type: MetricType::Gauge, + extractor: MetricExtractor::EventMetadata { + event_type: "test_event".to_string(), + metadata_key: "duration".to_string(), + converter: Some(MetricConverter::DurationToSeconds), + }, + labels: vec![], + }; + + assert_eq!(template.convert_value("1000ms", &Some(MetricConverter::DurationToSeconds)), Some(1.0)); + assert_eq!(template.convert_value("5s", &Some(MetricConverter::DurationToSeconds)), Some(5.0)); + assert_eq!(template.convert_value("2.5m", &Some(MetricConverter::DurationToSeconds)), Some(150.0)); + assert_eq!(template.convert_value("42", &Some(MetricConverter::DurationToSeconds)), Some(42.0)); + } + + #[test] + fn test_standard_metrics() { + let metrics = get_standard_metrics(); + assert!(!metrics.is_empty()); + + // Verify we have the key metrics + let metric_names: Vec<&String> = metrics.iter().map(|m| &m.name).collect(); + assert!(metric_names.contains(&&"databuild_job_duration_seconds".to_string())); + assert!(metric_names.contains(&&"databuild_job_peak_memory_mb".to_string())); + assert!(metric_names.contains(&&"databuild_job_cpu_time_seconds".to_string())); + assert!(metric_names.contains(&&"databuild_job_failures_total".to_string())); + } + + #[test] + fn test_no_extraction_for_wrong_event_type() { + let template = MetricTemplate { + name: "test_metric".to_string(), + help: "Test".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::EventCount { event_type: "task_failed".to_string() }, + labels: vec![], + }; + + let entry = create_test_task_success_entry("test-job"); // This is task_success, not task_failed + let result = template.extract(&entry); + + assert!(result.is_none()); + } + + #[test] + fn test_no_extraction_for_log_entries() { + let template = MetricTemplate { + name: "test_metric".to_string(), + help: "Test".to_string(), + metric_type: MetricType::Counter, + extractor: MetricExtractor::JobDuration, + labels: vec![], + }; + + // Create a log entry instead of job event + let entry = JobLogEntry { + timestamp: "1234567890".to_string(), + job_id: "test-job".to_string(), + outputs: vec![PartitionRef { r#str: "test/partition".to_string() }], + sequence_number: 1, + content: Some(job_log_entry::Content::Log(LogMessage { + level: log_message::LogLevel::Info as i32, + message: "Test log message".to_string(), + fields: HashMap::new(), + })), + }; + + let result = template.extract(&entry); + assert!(result.is_none()); + } +} \ No newline at end of file diff --git a/databuild/metrics_aggregator.rs b/databuild/metrics_aggregator.rs new file mode 100644 index 0000000..69c532a --- /dev/null +++ b/databuild/metrics_aggregator.rs @@ -0,0 +1,507 @@ +use crate::{JobLogEntry, log_access::LogReader, metric_templates::{MetricTemplate, ExtractedMetric, MetricType, get_standard_metrics}}; +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum MetricsError { + #[error("Log access error: {0}")] + LogAccess(#[from] crate::log_access::LogAccessError), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Too many label combinations for metric {metric}: {count} > {limit}")] + CardinalityLimit { metric: String, count: usize, limit: usize }, +} + +/// Aggregated metric value with labels +#[derive(Debug, Clone)] +pub struct AggregatedMetric { + pub name: String, + pub help: String, + pub metric_type: MetricType, + pub samples: Vec, +} + +/// Individual metric sample +#[derive(Debug, Clone)] +pub struct MetricSample { + pub labels: HashMap, + pub value: f64, + pub timestamp_ms: Option, +} + +/// Configuration for metrics aggregation +#[derive(Debug, Clone)] +pub struct MetricsConfig { + /// Maximum number of unique label combinations per metric (cardinality safety) + pub max_cardinality_per_metric: usize, + /// Time range for metrics collection (in hours from now) + pub time_range_hours: u64, + /// Whether to include job_id in labels (can create high cardinality) + pub include_job_id_labels: bool, + /// Maximum number of jobs to process per metric + pub max_jobs_per_metric: usize, +} + +impl Default for MetricsConfig { + fn default() -> Self { + Self { + max_cardinality_per_metric: 1000, // Prometheus recommended limit + time_range_hours: 24, // Last 24 hours + include_job_id_labels: false, // Disabled by default for cardinality safety + max_jobs_per_metric: 100, // Limit recent jobs + } + } +} + +/// Aggregates metrics from job logs with cardinality safety +pub struct MetricsAggregator { + log_reader: LogReader, + config: MetricsConfig, + templates: Vec, +} + +impl MetricsAggregator { + /// Create a new metrics aggregator + pub fn new>(logs_path: P, config: MetricsConfig) -> Self { + Self { + log_reader: LogReader::new(logs_path), + config, + templates: get_standard_metrics(), + } + } + + /// Create with default configuration + pub fn with_defaults>(logs_path: P) -> Self { + Self::new(logs_path, MetricsConfig::default()) + } + + /// Add custom metric template + pub fn add_template(&mut self, template: MetricTemplate) { + self.templates.push(template); + } + + /// Aggregate all metrics from recent job logs + pub fn aggregate_metrics(&self) -> Result, MetricsError> { + // Get recent job IDs + let job_ids = self.get_recent_job_ids()?; + + let mut aggregated: HashMap = HashMap::new(); + let mut cardinality_counters: HashMap> = HashMap::new(); + + // Process each job's logs + for job_id in job_ids.iter().take(self.config.max_jobs_per_metric) { + if let Ok(entries) = self.get_job_entries(job_id) { + for entry in entries { + self.process_entry(&entry, &mut aggregated, &mut cardinality_counters)?; + } + } + } + + Ok(aggregated.into_values().collect()) + } + + /// Generate Prometheus format output + pub fn to_prometheus_format(&self) -> Result { + let metrics = self.aggregate_metrics()?; + let mut output = String::new(); + + for metric in metrics { + // Add help comment + output.push_str(&format!("# HELP {} {}\n", metric.name, metric.help)); + + // Add type comment + let type_str = match metric.metric_type { + MetricType::Counter => "counter", + MetricType::Gauge => "gauge", + MetricType::Histogram => "histogram", + MetricType::Summary => "summary", + }; + output.push_str(&format!("# TYPE {} {}\n", metric.name, type_str)); + + // Add samples + for sample in metric.samples { + output.push_str(&format!("{}{} {}\n", + metric.name, + self.format_labels(&sample.labels), + sample.value + )); + } + output.push('\n'); + } + + Ok(output) + } + + /// Get recent job IDs within the configured time range + fn get_recent_job_ids(&self) -> Result, MetricsError> { + // For now, get all available jobs. In production, this would filter by date + let job_ids = self.log_reader.list_available_jobs(None)?; + Ok(job_ids) + } + + /// Get log entries for a specific job + fn get_job_entries(&self, job_id: &str) -> Result, MetricsError> { + use crate::JobLogsRequest; + + let request = JobLogsRequest { + job_run_id: job_id.to_string(), + since_timestamp: 0, + min_level: 0, + limit: 1000, // Get all entries for the job + }; + + let response = self.log_reader.get_job_logs(&request)?; + Ok(response.entries) + } + + /// Process a single log entry through all metric templates + fn process_entry( + &self, + entry: &JobLogEntry, + aggregated: &mut HashMap, + cardinality_counters: &mut HashMap>, + ) -> Result<(), MetricsError> { + for template in &self.templates { + if let Some(mut extracted) = template.extract(entry) { + // Apply cardinality safety filters + if !self.config.include_job_id_labels { + extracted.labels.remove("job_id"); + } + + // Check cardinality limit + let label_signature = self.get_label_signature(&extracted.labels); + let cardinality_set = cardinality_counters + .entry(extracted.name.clone()) + .or_insert_with(HashSet::new); + + if cardinality_set.len() >= self.config.max_cardinality_per_metric + && !cardinality_set.contains(&label_signature) { + // Skip this metric to avoid cardinality explosion + continue; + } + + cardinality_set.insert(label_signature); + + // Add to aggregated metrics + let agg_metric = aggregated + .entry(extracted.name.clone()) + .or_insert_with(|| AggregatedMetric { + name: extracted.name.clone(), + help: extracted.help.clone(), + metric_type: extracted.metric_type.clone(), + samples: Vec::new(), + }); + + // For counters, sum values with same labels; for gauges, keep latest + let existing_sample = agg_metric.samples.iter_mut() + .find(|s| s.labels == extracted.labels); + + if let Some(sample) = existing_sample { + match extracted.metric_type { + MetricType::Counter => { + sample.value += extracted.value; // Sum counters + }, + MetricType::Gauge | MetricType::Histogram | MetricType::Summary => { + sample.value = extracted.value; // Replace with latest + }, + } + } else { + agg_metric.samples.push(MetricSample { + labels: extracted.labels, + value: extracted.value, + timestamp_ms: None, // Could add timestamp parsing if needed + }); + } + } + } + + Ok(()) + } + + /// Generate a signature string for label combinations + fn get_label_signature(&self, labels: &HashMap) -> String { + let mut pairs: Vec<_> = labels.iter().collect(); + pairs.sort_by_key(|&(k, _)| k); + pairs.iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(",") + } + + /// Format labels for Prometheus output + fn format_labels(&self, labels: &HashMap) -> String { + if labels.is_empty() { + return String::new(); + } + + let mut pairs: Vec<_> = labels.iter().collect(); + pairs.sort_by_key(|&(k, _)| k); + + let formatted_pairs: Vec = pairs.iter() + .map(|(k, v)| format!("{}=\"{}\"", k, self.escape_label_value(v))) + .collect(); + + format!("{{{}}}", formatted_pairs.join(",")) + } + + /// Escape label values for Prometheus format + fn escape_label_value(&self, value: &str) -> String { + value + .replace('\\', "\\\\") + .replace('"', "\\\"") + .replace('\n', "\\n") + .replace('\t', "\\t") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{job_log_entry, PartitionRef, WrapperJobEvent}; + use std::io::Write; + use tempfile::TempDir; + + fn create_test_logs(temp_dir: &TempDir) -> Result<(), Box> { + // Create date directory + let date_dir = temp_dir.path().join("2025-01-27"); + std::fs::create_dir_all(&date_dir)?; + + // Create test job file with job summary + let job_file = date_dir.join("test_job_123.jsonl"); + let mut file = std::fs::File::create(&job_file)?; + + let entry = JobLogEntry { + timestamp: "1753763856".to_string(), + job_id: "test_job_123".to_string(), + outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }], + sequence_number: 4, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_summary".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: { + let mut meta = HashMap::new(); + meta.insert("runtime_ms".to_string(), "2500.000".to_string()); + meta.insert("peak_memory_mb".to_string(), "128.5".to_string()); + meta.insert("total_cpu_ms".to_string(), "1200.000".to_string()); + meta.insert("exit_code".to_string(), "0".to_string()); + meta + }, + job_label: None, + })), + }; + + writeln!(file, "{}", serde_json::to_string(&entry)?)?; + + // Create task_success entry + let success_entry = JobLogEntry { + timestamp: "1753763857".to_string(), + job_id: "test_job_123".to_string(), + outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }], + sequence_number: 5, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: HashMap::new(), + job_label: None, + })), + }; + + writeln!(file, "{}", serde_json::to_string(&success_entry)?)?; + + Ok(()) + } + + #[test] + fn test_metrics_aggregation() { + let temp_dir = TempDir::new().unwrap(); + create_test_logs(&temp_dir).unwrap(); + + let aggregator = MetricsAggregator::with_defaults(temp_dir.path()); + let metrics = aggregator.aggregate_metrics().unwrap(); + + assert!(!metrics.is_empty()); + + // Find duration metric + let duration_metric = metrics.iter() + .find(|m| m.name == "databuild_job_duration_seconds") + .expect("Should have duration metric"); + + assert_eq!(duration_metric.samples.len(), 1); + assert_eq!(duration_metric.samples[0].value, 2.5); // 2500ms -> 2.5s + + // Verify labels - should only have job_id (which gets excluded) and job_status + let labels = &duration_metric.samples[0].labels; + assert_eq!(labels.get("job_status").unwrap(), "JOB_COMPLETED"); + assert!(!labels.contains_key("job_id")); // Should be excluded by default + // Note: job_label would only be available from manifest entries, not job_summary events + } + + #[test] + fn test_prometheus_format() { + let temp_dir = TempDir::new().unwrap(); + create_test_logs(&temp_dir).unwrap(); + + let aggregator = MetricsAggregator::with_defaults(temp_dir.path()); + let prometheus_output = aggregator.to_prometheus_format().unwrap(); + + assert!(prometheus_output.contains("# HELP databuild_job_duration_seconds")); + assert!(prometheus_output.contains("# TYPE databuild_job_duration_seconds histogram")); + assert!(prometheus_output.contains("databuild_job_duration_seconds{")); + assert!(prometheus_output.contains("job_status=\"JOB_COMPLETED\"")); + assert!(prometheus_output.contains("} 2.5")); + } + + #[test] + fn test_cardinality_safety() { + let config = MetricsConfig { + max_cardinality_per_metric: 2, // Very low limit for testing + time_range_hours: 24, + include_job_id_labels: true, // Enable to test cardinality + max_jobs_per_metric: 100, + }; + + let temp_dir = TempDir::new().unwrap(); + + // Create multiple jobs to test cardinality limit + let date_dir = temp_dir.path().join("2025-01-27"); + std::fs::create_dir_all(&date_dir).unwrap(); + + for i in 1..=5 { + let job_file = date_dir.join(format!("job_{}.jsonl", i)); + let mut file = std::fs::File::create(&job_file).unwrap(); + + let entry = JobLogEntry { + timestamp: "1753763856".to_string(), + job_id: format!("job_{}", i), + outputs: vec![PartitionRef { r#str: format!("table_{}/date=2025-01-27", i) }], + sequence_number: 1, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: HashMap::new(), + job_label: None, + })), + }; + + writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + } + + let aggregator = MetricsAggregator::new(temp_dir.path(), config); + let metrics = aggregator.aggregate_metrics().unwrap(); + + // Find the success count metric + let success_metric = metrics.iter() + .find(|m| m.name == "databuild_job_events_total") + .expect("Should have success count metric"); + + // Should be limited by cardinality (max 2 unique label combinations) + assert!(success_metric.samples.len() <= 2, + "Expected <= 2 samples due to cardinality limit, got {}", + success_metric.samples.len()); + } + + #[test] + fn test_label_escaping() { + let aggregator = MetricsAggregator::with_defaults("/tmp"); + + assert_eq!(aggregator.escape_label_value("normal"), "normal"); + assert_eq!(aggregator.escape_label_value("with\"quotes"), "with\\\"quotes"); + assert_eq!(aggregator.escape_label_value("with\\backslash"), "with\\\\backslash"); + assert_eq!(aggregator.escape_label_value("with\nnewline"), "with\\nnewline"); + assert_eq!(aggregator.escape_label_value("with\ttab"), "with\\ttab"); + } + + #[test] + fn test_label_signature_generation() { + let aggregator = MetricsAggregator::with_defaults("/tmp"); + + let mut labels1 = HashMap::new(); + labels1.insert("job_label".to_string(), "test_job".to_string()); + labels1.insert("job_status".to_string(), "JOB_COMPLETED".to_string()); + + let mut labels2 = HashMap::new(); + labels2.insert("job_status".to_string(), "JOB_COMPLETED".to_string()); + labels2.insert("job_label".to_string(), "test_job".to_string()); + + // Order shouldn't matter + assert_eq!( + aggregator.get_label_signature(&labels1), + aggregator.get_label_signature(&labels2) + ); + + let signature = aggregator.get_label_signature(&labels1); + assert!(signature.contains("job_label=test_job")); + assert!(signature.contains("job_status=JOB_COMPLETED")); + } + + #[test] + fn test_counter_vs_gauge_aggregation() { + let temp_dir = TempDir::new().unwrap(); + let date_dir = temp_dir.path().join("2025-01-27"); + std::fs::create_dir_all(&date_dir).unwrap(); + + let job_file = date_dir.join("test_job.jsonl"); + let mut file = std::fs::File::create(&job_file).unwrap(); + + // Create multiple task_success events (should be summed as counter) + for i in 1..=3 { + let entry = JobLogEntry { + timestamp: format!("175376385{}", i), + job_id: "test_job".to_string(), + outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }], + sequence_number: i, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "task_success".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: HashMap::new(), + job_label: None, + })), + }; + writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + } + + // Create job summaries with different memory values (should use latest as gauge) + for (i, memory) in ["100.0", "150.0", "120.0"].iter().enumerate() { + let entry = JobLogEntry { + timestamp: format!("175376386{}", i), + job_id: "test_job".to_string(), + outputs: vec![PartitionRef { r#str: "reviews/date=2025-01-27".to_string() }], + sequence_number: (i + 10) as u64, + content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { + event_type: "job_summary".to_string(), + job_status: Some("JOB_COMPLETED".to_string()), + exit_code: Some(0), + metadata: { + let mut meta = HashMap::new(); + meta.insert("peak_memory_mb".to_string(), memory.to_string()); + meta.insert("runtime_ms".to_string(), "1000".to_string()); + meta.insert("total_cpu_ms".to_string(), "500".to_string()); + meta + }, + job_label: None, + })), + }; + writeln!(file, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); + } + + let aggregator = MetricsAggregator::with_defaults(temp_dir.path()); + let metrics = aggregator.aggregate_metrics().unwrap(); + + // Check counter behavior (task_success events should be summed) + let success_metric = metrics.iter() + .find(|m| m.name == "databuild_job_events_total") + .expect("Should have success count metric"); + assert_eq!(success_metric.samples[0].value, 3.0); // 3 events summed + + // Check gauge behavior (memory should be latest value) + let memory_metric = metrics.iter() + .find(|m| m.name == "databuild_job_peak_memory_mb") + .expect("Should have memory metric"); + assert_eq!(memory_metric.samples[0].value, 120.0); // Latest value + } +} \ No newline at end of file diff --git a/databuild/service/handlers.rs b/databuild/service/handlers.rs index 3a7eb21..1c44f7e 100644 --- a/databuild/service/handlers.rs +++ b/databuild/service/handlers.rs @@ -1506,4 +1506,177 @@ pub async fn cancel_build_repository( #[derive(Deserialize, JsonSchema)] pub struct CancelBuildRepositoryRequest { pub reason: String, +} + +// === Job Logs and Metrics Endpoints === + +use crate::{log_access::LogReader, metrics_aggregator::{MetricsAggregator, MetricsConfig}, JobLogsRequest}; +use serde::Serialize; + +/// Path parameter for job logs endpoint +#[derive(Deserialize, JsonSchema)] +pub struct JobLogsPathRequest { + pub job_run_id: String, +} + +/// Query parameters for job logs endpoint +#[derive(Deserialize, JsonSchema)] +pub struct JobLogsQueryRequest { + #[serde(default)] + pub since_timestamp: i64, + #[serde(default)] + pub min_level: i32, + #[serde(default = "default_logs_limit")] + pub limit: u32, +} + +fn default_logs_limit() -> u32 { + 1000 +} + +/// Response for job logs endpoint +#[derive(Serialize, JsonSchema)] +pub struct JobLogsApiResponse { + pub entries: Vec, + pub has_more: bool, +} + +/// Get job logs for a specific job run ID +pub async fn get_job_logs( + Path(JobLogsPathRequest { job_run_id }): Path, + axum::extract::Query(query): axum::extract::Query, +) -> Result, (StatusCode, Json)> { + let log_reader = LogReader::default(); + + let request = JobLogsRequest { + job_run_id, + since_timestamp: query.since_timestamp, + min_level: query.min_level, + limit: query.limit, + }; + + match log_reader.get_job_logs(&request) { + Ok(response) => Ok(Json(JobLogsApiResponse { + entries: response.entries, + has_more: response.has_more, + })), + Err(e) => { + error!("Failed to get job logs: {}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to get job logs: {}", e), + }), + )) + } + } +} + +/// List available job run IDs +#[derive(Deserialize, JsonSchema)] +pub struct ListJobsQueryRequest { + pub start_date: Option, + pub end_date: Option, +} + +/// Response for list jobs endpoint +#[derive(Serialize, JsonSchema)] +pub struct ListJobsResponse { + pub job_run_ids: Vec, +} + +pub async fn list_available_jobs( + axum::extract::Query(query): axum::extract::Query, +) -> Result, (StatusCode, Json)> { + let log_reader = LogReader::default(); + + let date_range = if let (Some(start), Some(end)) = (query.start_date, query.end_date) { + Some((start, end)) + } else { + None + }; + + match log_reader.list_available_jobs(date_range) { + Ok(job_ids) => Ok(Json(ListJobsResponse { + job_run_ids: job_ids, + })), + Err(e) => { + error!("Failed to list available jobs: {}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to list available jobs: {}", e), + }), + )) + } + } +} + +/// Query parameters for metrics endpoint +#[derive(Deserialize, JsonSchema)] +pub struct MetricsQueryRequest { + #[serde(default = "default_time_range_hours")] + pub time_range_hours: u64, + #[serde(default)] + pub include_job_id_labels: bool, + #[serde(default = "default_max_cardinality")] + pub max_cardinality_per_metric: usize, +} + +fn default_time_range_hours() -> u64 { + 24 +} + +fn default_max_cardinality() -> usize { + 1000 +} + +/// Get Prometheus metrics from job logs +pub async fn get_prometheus_metrics( + axum::extract::Query(query): axum::extract::Query, +) -> Result)> { + let config = MetricsConfig { + max_cardinality_per_metric: query.max_cardinality_per_metric, + time_range_hours: query.time_range_hours, + include_job_id_labels: query.include_job_id_labels, + max_jobs_per_metric: 100, + }; + + let aggregator = MetricsAggregator::new( + crate::log_collector::LogCollector::default_logs_dir(), + config + ); + + match aggregator.to_prometheus_format() { + Ok(prometheus_output) => Ok(prometheus_output), + Err(e) => { + error!("Failed to generate Prometheus metrics: {}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to generate Prometheus metrics: {}", e), + }), + )) + } + } +} + +/// Get log-based metrics for a specific job run +pub async fn get_job_run_metrics( + Path(JobLogsPathRequest { job_run_id }): Path, +) -> Result>, (StatusCode, Json)> { + let log_reader = LogReader::default(); + + match log_reader.get_job_metrics(&job_run_id) { + Ok(metrics) => Ok(Json(metrics)), + Err(e) => { + error!("Failed to get job metrics: {}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to get job metrics: {}", e), + }), + )) + } + } } \ No newline at end of file diff --git a/databuild/service/mod.rs b/databuild/service/mod.rs index bd5cf82..b73d2e3 100644 --- a/databuild/service/mod.rs +++ b/databuild/service/mod.rs @@ -286,6 +286,11 @@ impl BuildGraphService { .api_route("/api/v1/tasks/:job_run_id/cancel", post(handlers::cancel_task)) .api_route("/api/v1/activity", get(handlers::get_activity_summary)) .api_route("/api/v1/analyze", post(handlers::analyze_build_graph)) + // Job logs and metrics endpoints + .api_route("/api/v1/logs/jobs", get(handlers::list_available_jobs)) + .api_route("/api/v1/logs/jobs/:job_run_id", get(handlers::get_job_logs)) + .api_route("/api/v1/logs/jobs/:job_run_id/metrics", get(handlers::get_job_run_metrics)) + .route("/api/v1/metrics", axum::routing::get(handlers::get_prometheus_metrics)) .route("/api/v1/openapi.json", get(Self::openapi_spec)) .with_state(Arc::new(self)) .finish_api(&mut api);