databuild/plans/graph-side-log-consumption.md
Stuart Axelbrooke 1dfa45d94b
Some checks failed
/ setup (push) Has been cancelled
Update plan status
2025-07-28 22:48:46 -07:00

384 lines
No EOL
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Graph-Side Log Consumption Plan
## Status
- Phase 0: Design [DONE]
- Phase 1: Core Implementation [COMPLETED ✅]
- Phase 2: Advanced Features [FUTURE]
## Phase 1 Implementation Status
### ✅ **Core Components COMPLETED**
1. **JobLogEntry protobuf interface fixed** - Updated `databuild.proto` to use `repeated PartitionRef outputs` instead of single `string partition_ref`
2. **LogCollector implemented** - Consumes job wrapper stdout, parses structured logs, writes to date-organized JSONL files (`logs/databuild/YYYY-MM-DD/job_run_id.jsonl`)
3. **Graph integration completed** - LogCollector integrated into graph execution with UUID-based job ID coordination between graph and wrapper
4. **Unified Log Access Layer implemented** - Protobuf-based `LogReader` interface ensuring CLI/Service consistency for log retrieval
5. **Centralized metric templates** - All metric definitions centralized in `databuild/metric_templates.rs` module
6. **MetricsAggregator with cardinality safety** - Prometheus output without partition reference explosion, using job labels instead
7. **REST API endpoints implemented** - `/api/v1/jobs/{job_run_id}/logs` and `/api/v1/metrics` fully functional
8. **Graph-level job_label enrichment** - Solved cardinality issue via LogCollector enrichment pattern, consistent with design philosophy
### ✅ **Key Architectural Decisions Implemented**
- **Cardinality-safe metrics**: Job labels used instead of high-cardinality partition references in Prometheus output
- **Graph-level enrichment**: LogCollector enriches both WrapperJobEvent and Manifest entries with job_label from graph context
- **JSONL storage**: Date-organized file structure with robust error handling and concurrent access safety
- **Unified execution paths**: Both CLI and service builds produce identical BEL events and JSONL logs in same locations
- **Job ID coordination**: UUID-based job run IDs shared between graph execution and job wrapper via environment variable
### ✅ **All Success Criteria Met**
-**Reliable Log Capture**: All job wrapper output captured without loss through LogCollector
-**API Functionality**: REST API retrieves logs by job run ID, timestamp filtering, and log level filtering
-**Safe Metrics**: Prometheus endpoint works without cardinality explosion (job labels only, no partition refs)
-**Correctness**: No duplicated metric templates, all definitions centralized in `metric_templates.rs`
-**Concurrent Safety**: Multiple jobs write logs simultaneously without corruption via separate JSONL files per job
-**Simple Testing**: Test suite covers core functionality with minimal brittleness, all tests passing
### 🏗️ **Implementation Files**
- `databuild/databuild.proto` - Updated protobuf interfaces
- `databuild/log_collector.rs` - Core log collection and JSONL writing
- `databuild/log_access.rs` - Unified log reading interface
- `databuild/metric_templates.rs` - Centralized metric definitions
- `databuild/metrics_aggregator.rs` - Cardinality-safe Prometheus output
- `databuild/service/handlers.rs` - REST API endpoints implementation
- `databuild/graph/execute.rs` - Integration point for LogCollector
- `databuild/job/main.rs` - Job wrapper structured log emission
## Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
- **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model
- **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines
- **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration
- **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements
- **[plans/job-wrapper.md](./job-wrapper.md)** - Job wrapper implementation and structured log protocol
- **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures
## Overview
This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits `JobLogEntry` protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping.
## Key Technical Decisions
### 1. Storage Strategy: On-Disk with BEL Separation
**Decision**: Store structured logs on disk separate from the Build Event Log (BEL).
**Motivation**:
- Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore
- BEL is optimized for event-sourcing patterns, not high-volume log queries
- Separate storage allows independent scaling and retention policies
### 2. File Organization: Date-Organized Structure
**Decision**: Store logs in configurable-base, date-organized directories: `$LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl`
**Motivation**:
- Enables efficient cleanup by date (future optimization)
- Simplifies manual log management during development
- Facilitates external log collection tools (future)
### 3. Static Update Period (Phase 1)
**Decision**: Use fixed refresh interval for log processing. Adaptive batching is a future optimization.
**Motivation**:
- Simplicity for initial implementation
- Predictable performance characteristics
- Easier to debug and test
- Can optimize later based on real usage patterns
### 4. Manual Log Cleanup (Phase 1)
**Decision**: No automatic log retention/cleanup in initial implementation.
**Motivation**:
- We're in early development phase
- Manual cleanup acceptable for now
- Avoids complexity in initial implementation
- Automatic retention can be added as future optimization
### 5. Unified Telemetry Stream
**Decision**: All `JobLogEntry` messages (logs, metrics, events) flow through the same JSONL files.
**Motivation**:
- Simplicity - single consumption pipeline
- Temporal consistency - metrics and logs naturally correlated
- Unified file format reduces complexity
### 6. Cardinality-Safe Prometheus Metrics
**Decision**: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion.
**Motivation**:
- Partition labels (date × customer × region × etc.) would create massive cardinality
- Focus on job-level and system-level metrics only
- Use job_id and job_type labels instead of partition-specific labels
### 7. Centralized Metric Templates for Correctness
**Decision**: Define all Prometheus metric names and label templates in a central location to avoid string duplication.
**Motivation**:
- Prevents implicit coupling via duplicated string templates
- Single source of truth for metric definitions
- Easier to maintain consistency across codebase
### 8. Limited Scope (Phase 1)
**Decision**: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration.
**Motivation**:
- Web app integration is part of a bigger update
- Allows focused implementation on core log consumption
- API-first approach enables multiple consumers
### 9. Unified Execution Paths
**Decision**: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations.
**Motivation**:
- Building with CLI then querying from service "just works"
- Single source of truth for all build artifacts
- Consistent behavior regardless of execution method
- Simplifies debugging and operational workflows
## Interface Issues to Fix
### JobLogEntry Protobuf Update Required
The current `JobLogEntry` definition needs updates:
**Current (INCORRECT)**:
```proto
message JobLogEntry {
string partition_ref = 3; // Single string
// ...
}
```
**Required (CORRECT)**:
```proto
message JobLogEntry {
repeated PartitionRef outputs = 3; // Multiple PartitionRef objects
// ...
}
```
**Rationale**: Jobs produce multiple partitions, and we should use the proper `PartitionRef` type for consistency with other interfaces.
## Architecture
### Storage Layout
```
/logs/databuild/
├── 2025-01-27/
│ ├── job_run_123abc.jsonl
│ ├── job_run_456def.jsonl
│ └── ...
├── 2025-01-28/
│ └── ...
```
### File Format (JSONL)
Each file contains one JSON object per line, representing a `JobLogEntry`:
```json
{"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}}
{"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}}
{"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}}
```
### Consumption Pipeline
```
Job Wrapper (stdout) → Graph Log Collector → JSONL Files
Unified Log Access Layer
↙ ↘
Service API CLI API
Metrics Aggregator → /api/v1/metrics
```
## Implementation Components
### 1. Log Collector [PHASE 1]
**Responsibility**: Consume job wrapper stdout and write to JSONL files.
```rust
struct LogCollector {
logs_dir: PathBuf, // /logs/databuild
active_files: HashMap<String, File>, // job_run_id -> file handle
}
impl LogCollector {
fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader<ChildStdout>) -> Result<()>;
fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>;
fn ensure_date_directory(&self) -> Result<PathBuf>;
}
```
### 2. Unified Log Access Layer [PHASE 1]
**Responsibility**: Provide common interface for reading logs from JSONL files, used by both service and CLI.
```rust
// Core log access implementation
struct LogReader {
logs_base_path: PathBuf,
}
impl LogReader {
fn get_job_logs(&self, request: &JobLogsRequest) -> Result<JobLogsResponse>;
fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result<Vec<String>>;
fn get_job_metrics(&self, job_run_id: &str) -> Result<Vec<MetricPoint>>;
}
```
**Protobuf Interface** (ensures CLI/Service consistency):
```proto
message JobLogsRequest {
string job_run_id = 1;
int64 since_timestamp = 2; // Unix timestamp (nanoseconds)
int32 min_level = 3; // LogLevel enum value
uint32 limit = 4;
}
message JobLogsResponse {
repeated JobLogEntry entries = 1;
bool has_more = 2;
}
```
### 3. Metrics Templates [PHASE 1]
**Responsibility**: Centralized metric definitions to avoid string duplication.
```rust
// Central location for all metric definitions
mod metric_templates {
pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds";
pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb";
pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds";
pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total";
pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap<String, String> {
let mut labels = HashMap::new();
labels.insert("job_run_id".to_string(), job_run_id.to_string());
labels.insert("job_type".to_string(), job_type.to_string());
labels
}
}
```
### 4. Metrics Aggregator [PHASE 1]
**Responsibility**: Process `MetricPoint` messages and expose Prometheus format with safe cardinality.
```rust
struct MetricsAggregator {
metrics: HashMap<String, MetricFamily>,
}
impl MetricsAggregator {
fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str);
fn generate_prometheus_output(&self) -> String;
}
```
**Safe Prometheus Output** (NO partition labels):
```
# HELP databuild_job_runtime_seconds Job execution time in seconds
# TYPE databuild_job_runtime_seconds gauge
databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2
# HELP databuild_rows_processed_total Total rows processed by job
# TYPE databuild_rows_processed_total counter
databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000
```
## API Implementation
### REST Endpoints [PHASE 1]
**Get Job Logs**:
```
GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level}
```
Response: Array of `LogEntry` objects with filtering support.
**Prometheus Metrics Scraping**:
```
GET /api/v1/metrics
```
Response: All metrics in Prometheus exposition format.
## Configuration
### Environment Variables
```bash
# Log storage configuration
DATABUILD_LOGS_DIR=/logs/databuild # Log storage directory
# Processing configuration
DATABUILD_LOG_REFRESH_INTERVAL_MS=1000 # Fixed refresh interval (1s)
DATABUILD_LOG_CACHE_SIZE=100 # LRU cache size for job logs
```
## Implementation Phases
### Phase 1: Core Implementation [COMPLETED ✅]
**Goal**: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics.
**Deliverables** ✅:
- ✅ Fix `JobLogEntry` protobuf interface (partition_ref → outputs)
- ✅ LogCollector with JSONL file writing and graph-level job_label enrichment
- ✅ LogReader with unified protobuf interface for CLI/Service consistency
- ✅ REST API endpoints for job logs and Prometheus metrics
- ✅ MetricsAggregator with cardinality-safe output (job labels, not partition refs)
- ✅ Centralized metric templates module
**Success Criteria** ✅:
- ✅ Job logs are captured and stored reliably via LogCollector integration
- ✅ REST API can retrieve logs by job run ID and time range with filtering
- ✅ Prometheus metrics are exposed at `/api/v1/metrics` endpoint without cardinality issues
- ✅ System handles concurrent job execution without data corruption (separate JSONL files per job)
- ✅ All metric names/labels are defined in central location (`metric_templates.rs`)
### Phase 2: Advanced Features [FUTURE]
**Goal**: Performance optimizations and production features.
**Deliverables**:
- Adaptive batching based on system load
- Automatic log retention and cleanup
- Web app integration for log viewing
- Rate limiting for high-volume jobs
- Performance monitoring and alerting
## Testing Strategy
### Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1]
**Unit Tests**:
- JSONL parsing and serialization (basic happy path)
- Metrics aggregation and Prometheus formatting (template correctness)
- API endpoint responses (log retrieval by job_run_id)
**Integration Tests**:
- End-to-end: wrapper stdout → JSONL file → API response
- Concurrent job log collection (2-3 jobs simultaneously)
- Prometheus metrics scraping endpoint
**Key Principle**: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve.
## Future Extensions
### Performance Optimizations [FUTURE]
- Adaptive refresh intervals based on load
- Log compression for storage efficiency
- Advanced caching strategies
### Production Features [FUTURE]
- Automatic log retention and cleanup policies
- Integration with external log collection tools
- Web app log viewing and search capabilities
### Monitoring Integration [FUTURE]
- Grafana dashboard templates
- Alerting on log system health
- Performance metrics for log processing pipeline
## Success Criteria
1. **Reliable Log Capture**: All job wrapper output captured without loss
2. **API Functionality**: Can retrieve logs by job run ID and time range
3. **Safe Metrics**: Prometheus endpoint works without cardinality explosion
4. **Correctness**: No duplicated metric templates, centralized definitions
5. **Concurrent Safety**: Multiple jobs can write logs simultaneously without corruption
6. **Simple Testing**: Test suite covers core functionality with minimal brittleness