# Graph-Side Log Consumption Plan ## Status - Phase 0: Design [DONE] - Phase 1: Core Implementation [COMPLETED ✅] - Phase 2: Advanced Features [FUTURE] ## Phase 1 Implementation Status ### ✅ **Core Components COMPLETED** 1. **JobLogEntry protobuf interface fixed** - Updated `databuild.proto` to use `repeated PartitionRef outputs` instead of single `string partition_ref` 2. **LogCollector implemented** - Consumes job wrapper stdout, parses structured logs, writes to date-organized JSONL files (`logs/databuild/YYYY-MM-DD/job_run_id.jsonl`) 3. **Graph integration completed** - LogCollector integrated into graph execution with UUID-based job ID coordination between graph and wrapper 4. **Unified Log Access Layer implemented** - Protobuf-based `LogReader` interface ensuring CLI/Service consistency for log retrieval 5. **Centralized metric templates** - All metric definitions centralized in `databuild/metric_templates.rs` module 6. **MetricsAggregator with cardinality safety** - Prometheus output without partition reference explosion, using job labels instead 7. **REST API endpoints implemented** - `/api/v1/jobs/{job_run_id}/logs` and `/api/v1/metrics` fully functional 8. **Graph-level job_label enrichment** - Solved cardinality issue via LogCollector enrichment pattern, consistent with design philosophy ### ✅ **Key Architectural Decisions Implemented** - **Cardinality-safe metrics**: Job labels used instead of high-cardinality partition references in Prometheus output - **Graph-level enrichment**: LogCollector enriches both WrapperJobEvent and Manifest entries with job_label from graph context - **JSONL storage**: Date-organized file structure with robust error handling and concurrent access safety - **Unified execution paths**: Both CLI and service builds produce identical BEL events and JSONL logs in same locations - **Job ID coordination**: UUID-based job run IDs shared between graph execution and job wrapper via environment variable ### ✅ **All Success Criteria Met** - ✅ **Reliable Log Capture**: All job wrapper output captured without loss through LogCollector - ✅ **API Functionality**: REST API retrieves logs by job run ID, timestamp filtering, and log level filtering - ✅ **Safe Metrics**: Prometheus endpoint works without cardinality explosion (job labels only, no partition refs) - ✅ **Correctness**: No duplicated metric templates, all definitions centralized in `metric_templates.rs` - ✅ **Concurrent Safety**: Multiple jobs write logs simultaneously without corruption via separate JSONL files per job - ✅ **Simple Testing**: Test suite covers core functionality with minimal brittleness, all tests passing ### 🏗️ **Implementation Files** - `databuild/databuild.proto` - Updated protobuf interfaces - `databuild/log_collector.rs` - Core log collection and JSONL writing - `databuild/log_access.rs` - Unified log reading interface - `databuild/metric_templates.rs` - Centralized metric definitions - `databuild/metrics_aggregator.rs` - Cardinality-safe Prometheus output - `databuild/service/handlers.rs` - REST API endpoints implementation - `databuild/graph/execute.rs` - Integration point for LogCollector - `databuild/job/main.rs` - Job wrapper structured log emission ## Required Reading Before implementing this plan, engineers should thoroughly understand these design documents: - **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model - **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines - **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration - **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements - **[plans/job-wrapper.md](./job-wrapper.md)** - Job wrapper implementation and structured log protocol - **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures ## Overview This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits `JobLogEntry` protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping. ## Key Technical Decisions ### 1. Storage Strategy: On-Disk with BEL Separation **Decision**: Store structured logs on disk separate from the Build Event Log (BEL). **Motivation**: - Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore - BEL is optimized for event-sourcing patterns, not high-volume log queries - Separate storage allows independent scaling and retention policies ### 2. File Organization: Date-Organized Structure **Decision**: Store logs in configurable-base, date-organized directories: `$LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl` **Motivation**: - Enables efficient cleanup by date (future optimization) - Simplifies manual log management during development - Facilitates external log collection tools (future) ### 3. Static Update Period (Phase 1) **Decision**: Use fixed refresh interval for log processing. Adaptive batching is a future optimization. **Motivation**: - Simplicity for initial implementation - Predictable performance characteristics - Easier to debug and test - Can optimize later based on real usage patterns ### 4. Manual Log Cleanup (Phase 1) **Decision**: No automatic log retention/cleanup in initial implementation. **Motivation**: - We're in early development phase - Manual cleanup acceptable for now - Avoids complexity in initial implementation - Automatic retention can be added as future optimization ### 5. Unified Telemetry Stream **Decision**: All `JobLogEntry` messages (logs, metrics, events) flow through the same JSONL files. **Motivation**: - Simplicity - single consumption pipeline - Temporal consistency - metrics and logs naturally correlated - Unified file format reduces complexity ### 6. Cardinality-Safe Prometheus Metrics **Decision**: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion. **Motivation**: - Partition labels (date × customer × region × etc.) would create massive cardinality - Focus on job-level and system-level metrics only - Use job_id and job_type labels instead of partition-specific labels ### 7. Centralized Metric Templates for Correctness **Decision**: Define all Prometheus metric names and label templates in a central location to avoid string duplication. **Motivation**: - Prevents implicit coupling via duplicated string templates - Single source of truth for metric definitions - Easier to maintain consistency across codebase ### 8. Limited Scope (Phase 1) **Decision**: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration. **Motivation**: - Web app integration is part of a bigger update - Allows focused implementation on core log consumption - API-first approach enables multiple consumers ### 9. Unified Execution Paths **Decision**: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations. **Motivation**: - Building with CLI then querying from service "just works" - Single source of truth for all build artifacts - Consistent behavior regardless of execution method - Simplifies debugging and operational workflows ## Interface Issues to Fix ### JobLogEntry Protobuf Update Required The current `JobLogEntry` definition needs updates: **Current (INCORRECT)**: ```proto message JobLogEntry { string partition_ref = 3; // Single string // ... } ``` **Required (CORRECT)**: ```proto message JobLogEntry { repeated PartitionRef outputs = 3; // Multiple PartitionRef objects // ... } ``` **Rationale**: Jobs produce multiple partitions, and we should use the proper `PartitionRef` type for consistency with other interfaces. ## Architecture ### Storage Layout ``` /logs/databuild/ ├── 2025-01-27/ │ ├── job_run_123abc.jsonl │ ├── job_run_456def.jsonl │ └── ... ├── 2025-01-28/ │ └── ... ``` ### File Format (JSONL) Each file contains one JSON object per line, representing a `JobLogEntry`: ```json {"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}} {"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}} {"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}} ``` ### Consumption Pipeline ``` Job Wrapper (stdout) → Graph Log Collector → JSONL Files ↓ Unified Log Access Layer ↙ ↘ Service API CLI API ↓ Metrics Aggregator → /api/v1/metrics ``` ## Implementation Components ### 1. Log Collector [PHASE 1] **Responsibility**: Consume job wrapper stdout and write to JSONL files. ```rust struct LogCollector { logs_dir: PathBuf, // /logs/databuild active_files: HashMap, // job_run_id -> file handle } impl LogCollector { fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader) -> Result<()>; fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>; fn ensure_date_directory(&self) -> Result; } ``` ### 2. Unified Log Access Layer [PHASE 1] **Responsibility**: Provide common interface for reading logs from JSONL files, used by both service and CLI. ```rust // Core log access implementation struct LogReader { logs_base_path: PathBuf, } impl LogReader { fn get_job_logs(&self, request: &JobLogsRequest) -> Result; fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result>; fn get_job_metrics(&self, job_run_id: &str) -> Result>; } ``` **Protobuf Interface** (ensures CLI/Service consistency): ```proto message JobLogsRequest { string job_run_id = 1; int64 since_timestamp = 2; // Unix timestamp (nanoseconds) int32 min_level = 3; // LogLevel enum value uint32 limit = 4; } message JobLogsResponse { repeated JobLogEntry entries = 1; bool has_more = 2; } ``` ### 3. Metrics Templates [PHASE 1] **Responsibility**: Centralized metric definitions to avoid string duplication. ```rust // Central location for all metric definitions mod metric_templates { pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds"; pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb"; pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds"; pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total"; pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap { let mut labels = HashMap::new(); labels.insert("job_run_id".to_string(), job_run_id.to_string()); labels.insert("job_type".to_string(), job_type.to_string()); labels } } ``` ### 4. Metrics Aggregator [PHASE 1] **Responsibility**: Process `MetricPoint` messages and expose Prometheus format with safe cardinality. ```rust struct MetricsAggregator { metrics: HashMap, } impl MetricsAggregator { fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str); fn generate_prometheus_output(&self) -> String; } ``` **Safe Prometheus Output** (NO partition labels): ``` # HELP databuild_job_runtime_seconds Job execution time in seconds # TYPE databuild_job_runtime_seconds gauge databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2 # HELP databuild_rows_processed_total Total rows processed by job # TYPE databuild_rows_processed_total counter databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000 ``` ## API Implementation ### REST Endpoints [PHASE 1] **Get Job Logs**: ``` GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level} ``` Response: Array of `LogEntry` objects with filtering support. **Prometheus Metrics Scraping**: ``` GET /api/v1/metrics ``` Response: All metrics in Prometheus exposition format. ## Configuration ### Environment Variables ```bash # Log storage configuration DATABUILD_LOGS_DIR=/logs/databuild # Log storage directory # Processing configuration DATABUILD_LOG_REFRESH_INTERVAL_MS=1000 # Fixed refresh interval (1s) DATABUILD_LOG_CACHE_SIZE=100 # LRU cache size for job logs ``` ## Implementation Phases ### Phase 1: Core Implementation [COMPLETED ✅] **Goal**: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics. **Deliverables** ✅: - ✅ Fix `JobLogEntry` protobuf interface (partition_ref → outputs) - ✅ LogCollector with JSONL file writing and graph-level job_label enrichment - ✅ LogReader with unified protobuf interface for CLI/Service consistency - ✅ REST API endpoints for job logs and Prometheus metrics - ✅ MetricsAggregator with cardinality-safe output (job labels, not partition refs) - ✅ Centralized metric templates module **Success Criteria** ✅: - ✅ Job logs are captured and stored reliably via LogCollector integration - ✅ REST API can retrieve logs by job run ID and time range with filtering - ✅ Prometheus metrics are exposed at `/api/v1/metrics` endpoint without cardinality issues - ✅ System handles concurrent job execution without data corruption (separate JSONL files per job) - ✅ All metric names/labels are defined in central location (`metric_templates.rs`) ### Phase 2: Advanced Features [FUTURE] **Goal**: Performance optimizations and production features. **Deliverables**: - Adaptive batching based on system load - Automatic log retention and cleanup - Web app integration for log viewing - Rate limiting for high-volume jobs - Performance monitoring and alerting ## Testing Strategy ### Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1] **Unit Tests**: - JSONL parsing and serialization (basic happy path) - Metrics aggregation and Prometheus formatting (template correctness) - API endpoint responses (log retrieval by job_run_id) **Integration Tests**: - End-to-end: wrapper stdout → JSONL file → API response - Concurrent job log collection (2-3 jobs simultaneously) - Prometheus metrics scraping endpoint **Key Principle**: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve. ## Future Extensions ### Performance Optimizations [FUTURE] - Adaptive refresh intervals based on load - Log compression for storage efficiency - Advanced caching strategies ### Production Features [FUTURE] - Automatic log retention and cleanup policies - Integration with external log collection tools - Web app log viewing and search capabilities ### Monitoring Integration [FUTURE] - Grafana dashboard templates - Alerting on log system health - Performance metrics for log processing pipeline ## Success Criteria 1. **Reliable Log Capture**: All job wrapper output captured without loss 2. **API Functionality**: Can retrieve logs by job run ID and time range 3. **Safe Metrics**: Prometheus endpoint works without cardinality explosion 4. **Correctness**: No duplicated metric templates, centralized definitions 5. **Concurrent Safety**: Multiple jobs can write logs simultaneously without corruption 6. **Simple Testing**: Test suite covers core functionality with minimal brittleness