diff --git a/CLAUDE.md b/CLAUDE.md index 91da13e..b53a435 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -23,6 +23,7 @@ Please reference these for any related work, as they indicate key technical bias - We are building for the future, and choose to do "the right thing" rather than taking shortcuts to get unstuck. If you get stuck, pause and ask for help/input. - Do not add "unknown" results when parses or matches fail - these should always throw. - Compile time correctness is a super-power, and investment in it speeds up flywheel for development and user value. +- **CLI/Service Interchangeability**: Both the CLI and service must produce identical artifacts (BEL events, logs, metrics, outputs) in the same locations. Users should be able to build with one interface and query/inspect results from the other seamlessly. This principle applies to all DataBuild operations, not just builds. ## Build & Test ```bash diff --git a/plans/graph-side-log-consumption.md b/plans/graph-side-log-consumption.md new file mode 100644 index 0000000..128c155 --- /dev/null +++ b/plans/graph-side-log-consumption.md @@ -0,0 +1,347 @@ +# Graph-Side Log Consumption Plan + +## Status +- Phase 0: Design [DONE] +- Phase 1: Core Implementation [FUTURE] +- Phase 2: Advanced Features [FUTURE] + +## Required Reading + +Before implementing this plan, engineers should thoroughly understand these design documents: + +- **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model +- **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines +- **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration +- **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements +- **[plans/job-wrapper.md](./job-wrapper.md)** - Job wrapper implementation and structured log protocol +- **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures + +## Overview + +This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits `JobLogEntry` protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping. + +## Key Technical Decisions + +### 1. Storage Strategy: On-Disk with BEL Separation +**Decision**: Store structured logs on disk separate from the Build Event Log (BEL). + +**Motivation**: +- Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore +- BEL is optimized for event-sourcing patterns, not high-volume log queries +- Separate storage allows independent scaling and retention policies + +### 2. File Organization: Date-Organized Structure +**Decision**: Store logs in configurable-base, date-organized directories: `$LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl` + +**Motivation**: +- Enables efficient cleanup by date (future optimization) +- Simplifies manual log management during development +- Facilitates external log collection tools (future) + +### 3. Static Update Period (Phase 1) +**Decision**: Use fixed refresh interval for log processing. Adaptive batching is a future optimization. + +**Motivation**: +- Simplicity for initial implementation +- Predictable performance characteristics +- Easier to debug and test +- Can optimize later based on real usage patterns + +### 4. Manual Log Cleanup (Phase 1) +**Decision**: No automatic log retention/cleanup in initial implementation. + +**Motivation**: +- We're in early development phase +- Manual cleanup acceptable for now +- Avoids complexity in initial implementation +- Automatic retention can be added as future optimization + +### 5. Unified Telemetry Stream +**Decision**: All `JobLogEntry` messages (logs, metrics, events) flow through the same JSONL files. + +**Motivation**: +- Simplicity - single consumption pipeline +- Temporal consistency - metrics and logs naturally correlated +- Unified file format reduces complexity + +### 6. Cardinality-Safe Prometheus Metrics +**Decision**: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion. + +**Motivation**: +- Partition labels (date × customer × region × etc.) would create massive cardinality +- Focus on job-level and system-level metrics only +- Use job_id and job_type labels instead of partition-specific labels + +### 7. Centralized Metric Templates for Correctness +**Decision**: Define all Prometheus metric names and label templates in a central location to avoid string duplication. + +**Motivation**: +- Prevents implicit coupling via duplicated string templates +- Single source of truth for metric definitions +- Easier to maintain consistency across codebase + +### 8. Limited Scope (Phase 1) +**Decision**: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration. + +**Motivation**: +- Web app integration is part of a bigger update +- Allows focused implementation on core log consumption +- API-first approach enables multiple consumers + +### 9. Unified Execution Paths +**Decision**: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations. + +**Motivation**: +- Building with CLI then querying from service "just works" +- Single source of truth for all build artifacts +- Consistent behavior regardless of execution method +- Simplifies debugging and operational workflows + +## Interface Issues to Fix + +### JobLogEntry Protobuf Update Required +The current `JobLogEntry` definition needs updates: + +**Current (INCORRECT)**: +```proto +message JobLogEntry { + string partition_ref = 3; // Single string + // ... +} +``` + +**Required (CORRECT)**: +```proto +message JobLogEntry { + repeated PartitionRef outputs = 3; // Multiple PartitionRef objects + // ... +} +``` + +**Rationale**: Jobs produce multiple partitions, and we should use the proper `PartitionRef` type for consistency with other interfaces. + +## Architecture + +### Storage Layout +``` +/logs/databuild/ +├── 2025-01-27/ +│ ├── job_run_123abc.jsonl +│ ├── job_run_456def.jsonl +│ └── ... +├── 2025-01-28/ +│ └── ... +``` + +### File Format (JSONL) +Each file contains one JSON object per line, representing a `JobLogEntry`: +```json +{"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}} +{"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}} +{"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}} +``` + +### Consumption Pipeline +``` +Job Wrapper (stdout) → Graph Log Collector → JSONL Files + ↓ + Unified Log Access Layer + ↙ ↘ + Service API CLI API + ↓ + Metrics Aggregator → /api/v1/metrics +``` + +## Implementation Components + +### 1. Log Collector [PHASE 1] +**Responsibility**: Consume job wrapper stdout and write to JSONL files. + +```rust +struct LogCollector { + logs_dir: PathBuf, // /logs/databuild + active_files: HashMap, // job_run_id -> file handle +} + +impl LogCollector { + fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader) -> Result<()>; + fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>; + fn ensure_date_directory(&self) -> Result; +} +``` + +### 2. Unified Log Access Layer [PHASE 1] +**Responsibility**: Provide common interface for reading logs from JSONL files, used by both service and CLI. + +```rust +// Core log access implementation +struct LogReader { + logs_base_path: PathBuf, +} + +impl LogReader { + fn get_job_logs(&self, request: &JobLogsRequest) -> Result; + fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result>; + fn get_job_metrics(&self, job_run_id: &str) -> Result>; +} +``` + +**Protobuf Interface** (ensures CLI/Service consistency): +```proto +message JobLogsRequest { + string job_run_id = 1; + int64 since_timestamp = 2; // Unix timestamp (nanoseconds) + int32 min_level = 3; // LogLevel enum value + uint32 limit = 4; +} + +message JobLogsResponse { + repeated JobLogEntry entries = 1; + bool has_more = 2; +} +``` + +### 3. Metrics Templates [PHASE 1] +**Responsibility**: Centralized metric definitions to avoid string duplication. + +```rust +// Central location for all metric definitions +mod metric_templates { + pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds"; + pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb"; + pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds"; + pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total"; + + pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap { + let mut labels = HashMap::new(); + labels.insert("job_run_id".to_string(), job_run_id.to_string()); + labels.insert("job_type".to_string(), job_type.to_string()); + labels + } +} +``` + +### 4. Metrics Aggregator [PHASE 1] +**Responsibility**: Process `MetricPoint` messages and expose Prometheus format with safe cardinality. + +```rust +struct MetricsAggregator { + metrics: HashMap, +} + +impl MetricsAggregator { + fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str); + fn generate_prometheus_output(&self) -> String; +} +``` + +**Safe Prometheus Output** (NO partition labels): +``` +# HELP databuild_job_runtime_seconds Job execution time in seconds +# TYPE databuild_job_runtime_seconds gauge +databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2 + +# HELP databuild_rows_processed_total Total rows processed by job +# TYPE databuild_rows_processed_total counter +databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000 +``` + +## API Implementation + +### REST Endpoints [PHASE 1] + +**Get Job Logs**: +``` +GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level} +``` +Response: Array of `LogEntry` objects with filtering support. + +**Prometheus Metrics Scraping**: +``` +GET /api/v1/metrics +``` +Response: All metrics in Prometheus exposition format. + +## Configuration + +### Environment Variables +```bash +# Log storage configuration +DATABUILD_LOGS_DIR=/logs/databuild # Log storage directory + +# Processing configuration +DATABUILD_LOG_REFRESH_INTERVAL_MS=1000 # Fixed refresh interval (1s) +DATABUILD_LOG_CACHE_SIZE=100 # LRU cache size for job logs +``` + +## Implementation Phases + +### Phase 1: Core Implementation [FUTURE] +**Goal**: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics. + +**Deliverables**: +- Fix `JobLogEntry` protobuf interface (partition_ref → outputs) +- LogCollector with JSONL file writing +- LogProcessor with fixed refresh intervals +- REST API endpoints for job logs and Prometheus metrics +- MetricsAggregator with cardinality-safe output +- Centralized metric templates module + +**Success Criteria**: +- Job logs are captured and stored reliably +- REST API can retrieve logs by job run ID and time range +- Prometheus metrics are exposed at `/api/v1/metrics` endpoint without cardinality issues +- System handles concurrent job execution without data corruption +- All metric names/labels are defined in central location + +### Phase 2: Advanced Features [FUTURE] +**Goal**: Performance optimizations and production features. + +**Deliverables**: +- Adaptive batching based on system load +- Automatic log retention and cleanup +- Web app integration for log viewing +- Rate limiting for high-volume jobs +- Performance monitoring and alerting + +## Testing Strategy + +### Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1] + +**Unit Tests**: +- JSONL parsing and serialization (basic happy path) +- Metrics aggregation and Prometheus formatting (template correctness) +- API endpoint responses (log retrieval by job_run_id) + +**Integration Tests**: +- End-to-end: wrapper stdout → JSONL file → API response +- Concurrent job log collection (2-3 jobs simultaneously) +- Prometheus metrics scraping endpoint + +**Key Principle**: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve. + +## Future Extensions + +### Performance Optimizations [FUTURE] +- Adaptive refresh intervals based on load +- Log compression for storage efficiency +- Advanced caching strategies + +### Production Features [FUTURE] +- Automatic log retention and cleanup policies +- Integration with external log collection tools +- Web app log viewing and search capabilities + +### Monitoring Integration [FUTURE] +- Grafana dashboard templates +- Alerting on log system health +- Performance metrics for log processing pipeline + +## Success Criteria + +1. **Reliable Log Capture**: All job wrapper output captured without loss +2. **API Functionality**: Can retrieve logs by job run ID and time range +3. **Safe Metrics**: Prometheus endpoint works without cardinality explosion +4. **Correctness**: No duplicated metric templates, centralized definitions +5. **Concurrent Safety**: Multiple jobs can write logs simultaneously without corruption +6. **Simple Testing**: Test suite covers core functionality with minimal brittleness \ No newline at end of file diff --git a/plans/job-wrapper.md b/plans/job-wrapper.md index 04810c9..d7d1651 100644 --- a/plans/job-wrapper.md +++ b/plans/job-wrapper.md @@ -2,9 +2,9 @@ ## Status - Phase 0: Minimal Bootstrap [DONE] -- Phase 1: Core Protocol [PARTIAL] +- Phase 1: Core Protocol [MOSTLY DONE - heartbeating and metrics implemented] - Phase 2: Platform Support [FUTURE] -- Phase 3: Production Hardening [FUTURE] +- Phase 3: Production Hardening [FUTURE] - Phase 4: Advanced Features [FUTURE] ## Required Reading @@ -84,7 +84,7 @@ message JobEvent { // [DONE - as WrapperJobEvent] 2. Wrapper validates configuration [DONE] 3. Wrapper emits `task_launch_success` event (sequence #2) [DONE] 4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE] -5. Wrapper emits periodic `heartbeat` events (every 30s) [FUTURE] +5. Wrapper emits periodic `heartbeat` events (every 30s) [DONE] 6. Wrapper detects job completion [DONE] 7. Wrapper emits `task_success`/`task_failed` event [DONE] 8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE] @@ -197,25 +197,32 @@ The graph component will: [FUTURE] For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs). -### Heartbeating [FUTURE] +### Heartbeating [DONE] -Fixed 30-second heartbeat interval (based on Kubernetes best practices): +Fixed 30-second heartbeat interval (configurable via `DATABUILD_HEARTBEAT_INTERVAL_MS`): ```json { "timestamp": "2025-01-27T10:30:45Z", "content": { - "event": { + "JobEvent": { "event_type": "heartbeat", "metadata": { - "memory_usage_mb": "1024", - "cpu_usage_percent": "85.2" + "memory_usage_mb": "1024.256", + "cpu_usage_percent": "85.200" } } } } ``` +**Implementation Details:** +- Uses sysinfo crate for cross-platform process monitoring +- Heartbeat thread communicates via channels with main thread +- Includes memory usage (MB) and CPU usage (%) with 3 decimal precision +- Configurable interval for testing (default 30s, test environments use 100ms) +- Proper dependency injection via LogSink trait for testability + ### Log Bandwidth Limits [FUTURE] To prevent log flooding: @@ -226,14 +233,17 @@ To prevent log flooding: ## Testing Strategy -### Unit Tests [FUTURE] -- Log parsing and serialization -- Exit code categorization -- Rate limiting behavior -- State machine transitions +### Unit Tests [MOSTLY DONE] +- Log parsing and serialization [DONE - protobuf serde] +- State machine transitions [DONE - JobLogEntry sequence validation] +- Heartbeat functionality [DONE - with dependency injection] +- CPU/memory metrics collection [DONE - with configurable intervals] +- Exit code categorization [FUTURE] +- Rate limiting behavior [FUTURE] ### Integration Tests [PARTIAL] - Full job execution lifecycle [DONE - via e2e tests] +- Resource metrics validation [DONE - CPU-intensive workload testing] - Platform-specific log tailing [PARTIAL - local only] - Fast job completion handling [DONE] - Large log volume handling [FUTURE] @@ -272,11 +282,13 @@ This phase delivers a working end-to-end system that can be continuously evolved - Modified Bazel rules to use job_wrapper [DONE] - All e2e tests passing [DONE] -### Phase 1: Core Protocol [PARTIAL] +### Phase 1: Core Protocol [MOSTLY DONE] - Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent] - Implement structured logger [DONE - JSON serialization to stdout] - Add error handling and exit codes [PARTIAL - basic forwarding only] -- Implement heartbeating [FUTURE] +- Implement heartbeating [DONE - with CPU/memory metrics] +- Resource metrics collection [DONE - CPU time, peak memory, runtime] +- Dependency injection for testability [DONE - LogSink trait pattern] - Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume] - MetricPoint message support [FUTURE] - Advanced error categorization [FUTURE] @@ -313,8 +325,22 @@ This phase delivers a working end-to-end system that can be continuously evolved 3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation] 4. Gradually add features per implementation phases [IN PROGRESS] -**Immediate Next Steps for Phase 1 Completion:** -- Add heartbeating support [FUTURE] +**Phase 1 Achievements:** +- ✅ Heartbeating support with CPU/memory metrics [DONE] +- ✅ Dependency injection for testability (LogSink trait) [DONE] +- ✅ Resource metrics collection (CPU time, peak memory, runtime) [DONE] +- ✅ Comprehensive test coverage for heartbeats and metrics [DONE] +- ✅ Configurable intervals for different environments [DONE] + +**Remaining for Phase 1 Completion:** - Implement MetricPoint logging [FUTURE] -- Add graph-side structured log consumption [FUTURE] -- Enhanced error categorization and exit code mapping [FUTURE] \ No newline at end of file +- Add graph-side structured log consumption [FUTURE] +- Enhanced error categorization and exit code mapping [FUTURE] + +**Recent Implementation Details:** +- Uses sysinfo 0.30 for cross-platform process monitoring +- Thread-safe heartbeat communication via mpsc channels +- Floating-point metrics with 3 decimal precision (f64) +- Environment variable configuration: `DATABUILD_HEARTBEAT_INTERVAL_MS`, `DATABUILD_METRICS_INTERVAL_MS` +- Robust test infrastructure with synthetic CPU-intensive workloads +- Proper CPU time calculation: (average_cpu_percent / 100.0) × wall_clock_time \ No newline at end of file