Add plan for graph-side log consumption
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-07-28 20:56:07 -07:00
parent 3c4d3d89db
commit cccfbd1133
3 changed files with 393 additions and 19 deletions

View file

@ -23,6 +23,7 @@ Please reference these for any related work, as they indicate key technical bias
- We are building for the future, and choose to do "the right thing" rather than taking shortcuts to get unstuck. If you get stuck, pause and ask for help/input. - We are building for the future, and choose to do "the right thing" rather than taking shortcuts to get unstuck. If you get stuck, pause and ask for help/input.
- Do not add "unknown" results when parses or matches fail - these should always throw. - Do not add "unknown" results when parses or matches fail - these should always throw.
- Compile time correctness is a super-power, and investment in it speeds up flywheel for development and user value. - Compile time correctness is a super-power, and investment in it speeds up flywheel for development and user value.
- **CLI/Service Interchangeability**: Both the CLI and service must produce identical artifacts (BEL events, logs, metrics, outputs) in the same locations. Users should be able to build with one interface and query/inspect results from the other seamlessly. This principle applies to all DataBuild operations, not just builds.
## Build & Test ## Build & Test
```bash ```bash

View file

@ -0,0 +1,347 @@
# Graph-Side Log Consumption Plan
## Status
- Phase 0: Design [DONE]
- Phase 1: Core Implementation [FUTURE]
- Phase 2: Advanced Features [FUTURE]
## Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
- **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model
- **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines
- **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration
- **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements
- **[plans/job-wrapper.md](./job-wrapper.md)** - Job wrapper implementation and structured log protocol
- **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures
## Overview
This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits `JobLogEntry` protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping.
## Key Technical Decisions
### 1. Storage Strategy: On-Disk with BEL Separation
**Decision**: Store structured logs on disk separate from the Build Event Log (BEL).
**Motivation**:
- Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore
- BEL is optimized for event-sourcing patterns, not high-volume log queries
- Separate storage allows independent scaling and retention policies
### 2. File Organization: Date-Organized Structure
**Decision**: Store logs in configurable-base, date-organized directories: `$LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl`
**Motivation**:
- Enables efficient cleanup by date (future optimization)
- Simplifies manual log management during development
- Facilitates external log collection tools (future)
### 3. Static Update Period (Phase 1)
**Decision**: Use fixed refresh interval for log processing. Adaptive batching is a future optimization.
**Motivation**:
- Simplicity for initial implementation
- Predictable performance characteristics
- Easier to debug and test
- Can optimize later based on real usage patterns
### 4. Manual Log Cleanup (Phase 1)
**Decision**: No automatic log retention/cleanup in initial implementation.
**Motivation**:
- We're in early development phase
- Manual cleanup acceptable for now
- Avoids complexity in initial implementation
- Automatic retention can be added as future optimization
### 5. Unified Telemetry Stream
**Decision**: All `JobLogEntry` messages (logs, metrics, events) flow through the same JSONL files.
**Motivation**:
- Simplicity - single consumption pipeline
- Temporal consistency - metrics and logs naturally correlated
- Unified file format reduces complexity
### 6. Cardinality-Safe Prometheus Metrics
**Decision**: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion.
**Motivation**:
- Partition labels (date × customer × region × etc.) would create massive cardinality
- Focus on job-level and system-level metrics only
- Use job_id and job_type labels instead of partition-specific labels
### 7. Centralized Metric Templates for Correctness
**Decision**: Define all Prometheus metric names and label templates in a central location to avoid string duplication.
**Motivation**:
- Prevents implicit coupling via duplicated string templates
- Single source of truth for metric definitions
- Easier to maintain consistency across codebase
### 8. Limited Scope (Phase 1)
**Decision**: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration.
**Motivation**:
- Web app integration is part of a bigger update
- Allows focused implementation on core log consumption
- API-first approach enables multiple consumers
### 9. Unified Execution Paths
**Decision**: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations.
**Motivation**:
- Building with CLI then querying from service "just works"
- Single source of truth for all build artifacts
- Consistent behavior regardless of execution method
- Simplifies debugging and operational workflows
## Interface Issues to Fix
### JobLogEntry Protobuf Update Required
The current `JobLogEntry` definition needs updates:
**Current (INCORRECT)**:
```proto
message JobLogEntry {
string partition_ref = 3; // Single string
// ...
}
```
**Required (CORRECT)**:
```proto
message JobLogEntry {
repeated PartitionRef outputs = 3; // Multiple PartitionRef objects
// ...
}
```
**Rationale**: Jobs produce multiple partitions, and we should use the proper `PartitionRef` type for consistency with other interfaces.
## Architecture
### Storage Layout
```
/logs/databuild/
├── 2025-01-27/
│ ├── job_run_123abc.jsonl
│ ├── job_run_456def.jsonl
│ └── ...
├── 2025-01-28/
│ └── ...
```
### File Format (JSONL)
Each file contains one JSON object per line, representing a `JobLogEntry`:
```json
{"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}}
{"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}}
{"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}}
```
### Consumption Pipeline
```
Job Wrapper (stdout) → Graph Log Collector → JSONL Files
Unified Log Access Layer
↙ ↘
Service API CLI API
Metrics Aggregator → /api/v1/metrics
```
## Implementation Components
### 1. Log Collector [PHASE 1]
**Responsibility**: Consume job wrapper stdout and write to JSONL files.
```rust
struct LogCollector {
logs_dir: PathBuf, // /logs/databuild
active_files: HashMap<String, File>, // job_run_id -> file handle
}
impl LogCollector {
fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader<ChildStdout>) -> Result<()>;
fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>;
fn ensure_date_directory(&self) -> Result<PathBuf>;
}
```
### 2. Unified Log Access Layer [PHASE 1]
**Responsibility**: Provide common interface for reading logs from JSONL files, used by both service and CLI.
```rust
// Core log access implementation
struct LogReader {
logs_base_path: PathBuf,
}
impl LogReader {
fn get_job_logs(&self, request: &JobLogsRequest) -> Result<JobLogsResponse>;
fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result<Vec<String>>;
fn get_job_metrics(&self, job_run_id: &str) -> Result<Vec<MetricPoint>>;
}
```
**Protobuf Interface** (ensures CLI/Service consistency):
```proto
message JobLogsRequest {
string job_run_id = 1;
int64 since_timestamp = 2; // Unix timestamp (nanoseconds)
int32 min_level = 3; // LogLevel enum value
uint32 limit = 4;
}
message JobLogsResponse {
repeated JobLogEntry entries = 1;
bool has_more = 2;
}
```
### 3. Metrics Templates [PHASE 1]
**Responsibility**: Centralized metric definitions to avoid string duplication.
```rust
// Central location for all metric definitions
mod metric_templates {
pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds";
pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb";
pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds";
pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total";
pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap<String, String> {
let mut labels = HashMap::new();
labels.insert("job_run_id".to_string(), job_run_id.to_string());
labels.insert("job_type".to_string(), job_type.to_string());
labels
}
}
```
### 4. Metrics Aggregator [PHASE 1]
**Responsibility**: Process `MetricPoint` messages and expose Prometheus format with safe cardinality.
```rust
struct MetricsAggregator {
metrics: HashMap<String, MetricFamily>,
}
impl MetricsAggregator {
fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str);
fn generate_prometheus_output(&self) -> String;
}
```
**Safe Prometheus Output** (NO partition labels):
```
# HELP databuild_job_runtime_seconds Job execution time in seconds
# TYPE databuild_job_runtime_seconds gauge
databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2
# HELP databuild_rows_processed_total Total rows processed by job
# TYPE databuild_rows_processed_total counter
databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000
```
## API Implementation
### REST Endpoints [PHASE 1]
**Get Job Logs**:
```
GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level}
```
Response: Array of `LogEntry` objects with filtering support.
**Prometheus Metrics Scraping**:
```
GET /api/v1/metrics
```
Response: All metrics in Prometheus exposition format.
## Configuration
### Environment Variables
```bash
# Log storage configuration
DATABUILD_LOGS_DIR=/logs/databuild # Log storage directory
# Processing configuration
DATABUILD_LOG_REFRESH_INTERVAL_MS=1000 # Fixed refresh interval (1s)
DATABUILD_LOG_CACHE_SIZE=100 # LRU cache size for job logs
```
## Implementation Phases
### Phase 1: Core Implementation [FUTURE]
**Goal**: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics.
**Deliverables**:
- Fix `JobLogEntry` protobuf interface (partition_ref → outputs)
- LogCollector with JSONL file writing
- LogProcessor with fixed refresh intervals
- REST API endpoints for job logs and Prometheus metrics
- MetricsAggregator with cardinality-safe output
- Centralized metric templates module
**Success Criteria**:
- Job logs are captured and stored reliably
- REST API can retrieve logs by job run ID and time range
- Prometheus metrics are exposed at `/api/v1/metrics` endpoint without cardinality issues
- System handles concurrent job execution without data corruption
- All metric names/labels are defined in central location
### Phase 2: Advanced Features [FUTURE]
**Goal**: Performance optimizations and production features.
**Deliverables**:
- Adaptive batching based on system load
- Automatic log retention and cleanup
- Web app integration for log viewing
- Rate limiting for high-volume jobs
- Performance monitoring and alerting
## Testing Strategy
### Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1]
**Unit Tests**:
- JSONL parsing and serialization (basic happy path)
- Metrics aggregation and Prometheus formatting (template correctness)
- API endpoint responses (log retrieval by job_run_id)
**Integration Tests**:
- End-to-end: wrapper stdout → JSONL file → API response
- Concurrent job log collection (2-3 jobs simultaneously)
- Prometheus metrics scraping endpoint
**Key Principle**: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve.
## Future Extensions
### Performance Optimizations [FUTURE]
- Adaptive refresh intervals based on load
- Log compression for storage efficiency
- Advanced caching strategies
### Production Features [FUTURE]
- Automatic log retention and cleanup policies
- Integration with external log collection tools
- Web app log viewing and search capabilities
### Monitoring Integration [FUTURE]
- Grafana dashboard templates
- Alerting on log system health
- Performance metrics for log processing pipeline
## Success Criteria
1. **Reliable Log Capture**: All job wrapper output captured without loss
2. **API Functionality**: Can retrieve logs by job run ID and time range
3. **Safe Metrics**: Prometheus endpoint works without cardinality explosion
4. **Correctness**: No duplicated metric templates, centralized definitions
5. **Concurrent Safety**: Multiple jobs can write logs simultaneously without corruption
6. **Simple Testing**: Test suite covers core functionality with minimal brittleness

View file

@ -2,9 +2,9 @@
## Status ## Status
- Phase 0: Minimal Bootstrap [DONE] - Phase 0: Minimal Bootstrap [DONE]
- Phase 1: Core Protocol [PARTIAL] - Phase 1: Core Protocol [MOSTLY DONE - heartbeating and metrics implemented]
- Phase 2: Platform Support [FUTURE] - Phase 2: Platform Support [FUTURE]
- Phase 3: Production Hardening [FUTURE] - Phase 3: Production Hardening [FUTURE]
- Phase 4: Advanced Features [FUTURE] - Phase 4: Advanced Features [FUTURE]
## Required Reading ## Required Reading
@ -84,7 +84,7 @@ message JobEvent { // [DONE - as WrapperJobEvent]
2. Wrapper validates configuration [DONE] 2. Wrapper validates configuration [DONE]
3. Wrapper emits `task_launch_success` event (sequence #2) [DONE] 3. Wrapper emits `task_launch_success` event (sequence #2) [DONE]
4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE] 4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE]
5. Wrapper emits periodic `heartbeat` events (every 30s) [FUTURE] 5. Wrapper emits periodic `heartbeat` events (every 30s) [DONE]
6. Wrapper detects job completion [DONE] 6. Wrapper detects job completion [DONE]
7. Wrapper emits `task_success`/`task_failed` event [DONE] 7. Wrapper emits `task_success`/`task_failed` event [DONE]
8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE] 8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE]
@ -197,25 +197,32 @@ The graph component will: [FUTURE]
For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs). For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).
### Heartbeating [FUTURE] ### Heartbeating [DONE]
Fixed 30-second heartbeat interval (based on Kubernetes best practices): Fixed 30-second heartbeat interval (configurable via `DATABUILD_HEARTBEAT_INTERVAL_MS`):
```json ```json
{ {
"timestamp": "2025-01-27T10:30:45Z", "timestamp": "2025-01-27T10:30:45Z",
"content": { "content": {
"event": { "JobEvent": {
"event_type": "heartbeat", "event_type": "heartbeat",
"metadata": { "metadata": {
"memory_usage_mb": "1024", "memory_usage_mb": "1024.256",
"cpu_usage_percent": "85.2" "cpu_usage_percent": "85.200"
} }
} }
} }
} }
``` ```
**Implementation Details:**
- Uses sysinfo crate for cross-platform process monitoring
- Heartbeat thread communicates via channels with main thread
- Includes memory usage (MB) and CPU usage (%) with 3 decimal precision
- Configurable interval for testing (default 30s, test environments use 100ms)
- Proper dependency injection via LogSink trait for testability
### Log Bandwidth Limits [FUTURE] ### Log Bandwidth Limits [FUTURE]
To prevent log flooding: To prevent log flooding:
@ -226,14 +233,17 @@ To prevent log flooding:
## Testing Strategy ## Testing Strategy
### Unit Tests [FUTURE] ### Unit Tests [MOSTLY DONE]
- Log parsing and serialization - Log parsing and serialization [DONE - protobuf serde]
- Exit code categorization - State machine transitions [DONE - JobLogEntry sequence validation]
- Rate limiting behavior - Heartbeat functionality [DONE - with dependency injection]
- State machine transitions - CPU/memory metrics collection [DONE - with configurable intervals]
- Exit code categorization [FUTURE]
- Rate limiting behavior [FUTURE]
### Integration Tests [PARTIAL] ### Integration Tests [PARTIAL]
- Full job execution lifecycle [DONE - via e2e tests] - Full job execution lifecycle [DONE - via e2e tests]
- Resource metrics validation [DONE - CPU-intensive workload testing]
- Platform-specific log tailing [PARTIAL - local only] - Platform-specific log tailing [PARTIAL - local only]
- Fast job completion handling [DONE] - Fast job completion handling [DONE]
- Large log volume handling [FUTURE] - Large log volume handling [FUTURE]
@ -272,11 +282,13 @@ This phase delivers a working end-to-end system that can be continuously evolved
- Modified Bazel rules to use job_wrapper [DONE] - Modified Bazel rules to use job_wrapper [DONE]
- All e2e tests passing [DONE] - All e2e tests passing [DONE]
### Phase 1: Core Protocol [PARTIAL] ### Phase 1: Core Protocol [MOSTLY DONE]
- Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent] - Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent]
- Implement structured logger [DONE - JSON serialization to stdout] - Implement structured logger [DONE - JSON serialization to stdout]
- Add error handling and exit codes [PARTIAL - basic forwarding only] - Add error handling and exit codes [PARTIAL - basic forwarding only]
- Implement heartbeating [FUTURE] - Implement heartbeating [DONE - with CPU/memory metrics]
- Resource metrics collection [DONE - CPU time, peak memory, runtime]
- Dependency injection for testability [DONE - LogSink trait pattern]
- Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume] - Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume]
- MetricPoint message support [FUTURE] - MetricPoint message support [FUTURE]
- Advanced error categorization [FUTURE] - Advanced error categorization [FUTURE]
@ -313,8 +325,22 @@ This phase delivers a working end-to-end system that can be continuously evolved
3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation] 3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation]
4. Gradually add features per implementation phases [IN PROGRESS] 4. Gradually add features per implementation phases [IN PROGRESS]
**Immediate Next Steps for Phase 1 Completion:** **Phase 1 Achievements:**
- Add heartbeating support [FUTURE] - ✅ Heartbeating support with CPU/memory metrics [DONE]
- ✅ Dependency injection for testability (LogSink trait) [DONE]
- ✅ Resource metrics collection (CPU time, peak memory, runtime) [DONE]
- ✅ Comprehensive test coverage for heartbeats and metrics [DONE]
- ✅ Configurable intervals for different environments [DONE]
**Remaining for Phase 1 Completion:**
- Implement MetricPoint logging [FUTURE] - Implement MetricPoint logging [FUTURE]
- Add graph-side structured log consumption [FUTURE] - Add graph-side structured log consumption [FUTURE]
- Enhanced error categorization and exit code mapping [FUTURE] - Enhanced error categorization and exit code mapping [FUTURE]
**Recent Implementation Details:**
- Uses sysinfo 0.30 for cross-platform process monitoring
- Thread-safe heartbeat communication via mpsc channels
- Floating-point metrics with 3 decimal precision (f64)
- Environment variable configuration: `DATABUILD_HEARTBEAT_INTERVAL_MS`, `DATABUILD_METRICS_INTERVAL_MS`
- Robust test infrastructure with synthetic CPU-intensive workloads
- Proper CPU time calculation: (average_cpu_percent / 100.0) × wall_clock_time