16 KiB
Graph-Side Log Consumption Plan
Status
- Phase 0: Design [DONE]
- Phase 1: Core Implementation [COMPLETED ✅]
- Phase 2: Advanced Features [FUTURE]
Phase 1 Implementation Status
✅ Core Components COMPLETED
- JobLogEntry protobuf interface fixed - Updated
databuild.prototo userepeated PartitionRef outputsinstead of singlestring partition_ref - LogCollector implemented - Consumes job wrapper stdout, parses structured logs, writes to date-organized JSONL files (
logs/databuild/YYYY-MM-DD/job_run_id.jsonl) - Graph integration completed - LogCollector integrated into graph execution with UUID-based job ID coordination between graph and wrapper
- Unified Log Access Layer implemented - Protobuf-based
LogReaderinterface ensuring CLI/Service consistency for log retrieval - Centralized metric templates - All metric definitions centralized in
databuild/metric_templates.rsmodule - MetricsAggregator with cardinality safety - Prometheus output without partition reference explosion, using job labels instead
- REST API endpoints implemented -
/api/v1/jobs/{job_run_id}/logsand/api/v1/metricsfully functional - Graph-level job_label enrichment - Solved cardinality issue via LogCollector enrichment pattern, consistent with design philosophy
✅ Key Architectural Decisions Implemented
- Cardinality-safe metrics: Job labels used instead of high-cardinality partition references in Prometheus output
- Graph-level enrichment: LogCollector enriches both WrapperJobEvent and Manifest entries with job_label from graph context
- JSONL storage: Date-organized file structure with robust error handling and concurrent access safety
- Unified execution paths: Both CLI and service builds produce identical BEL events and JSONL logs in same locations
- Job ID coordination: UUID-based job run IDs shared between graph execution and job wrapper via environment variable
✅ All Success Criteria Met
- ✅ Reliable Log Capture: All job wrapper output captured without loss through LogCollector
- ✅ API Functionality: REST API retrieves logs by job run ID, timestamp filtering, and log level filtering
- ✅ Safe Metrics: Prometheus endpoint works without cardinality explosion (job labels only, no partition refs)
- ✅ Correctness: No duplicated metric templates, all definitions centralized in
metric_templates.rs - ✅ Concurrent Safety: Multiple jobs write logs simultaneously without corruption via separate JSONL files per job
- ✅ Simple Testing: Test suite covers core functionality with minimal brittleness, all tests passing
🏗️ Implementation Files
databuild/databuild.proto- Updated protobuf interfacesdatabuild/log_collector.rs- Core log collection and JSONL writingdatabuild/log_access.rs- Unified log reading interfacedatabuild/metric_templates.rs- Centralized metric definitionsdatabuild/metrics_aggregator.rs- Cardinality-safe Prometheus outputdatabuild/service/handlers.rs- REST API endpoints implementationdatabuild/graph/execute.rs- Integration point for LogCollectordatabuild/job/main.rs- Job wrapper structured log emission
Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
- DESIGN.md - Overall DataBuild architecture and job execution model
- design/core-build.md - Core build semantics and job lifecycle state machines
- design/build-event-log.md - Event sourcing model and BEL integration
- design/observability.md - Observability strategy and telemetry requirements
- plans/job-wrapper.md - Job wrapper implementation and structured log protocol
- databuild.proto - System interfaces and data structures
Overview
This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits JobLogEntry protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping.
Key Technical Decisions
1. Storage Strategy: On-Disk with BEL Separation
Decision: Store structured logs on disk separate from the Build Event Log (BEL).
Motivation:
- Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore
- BEL is optimized for event-sourcing patterns, not high-volume log queries
- Separate storage allows independent scaling and retention policies
2. File Organization: Date-Organized Structure
Decision: Store logs in configurable-base, date-organized directories: $LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl
Motivation:
- Enables efficient cleanup by date (future optimization)
- Simplifies manual log management during development
- Facilitates external log collection tools (future)
3. Static Update Period (Phase 1)
Decision: Use fixed refresh interval for log processing. Adaptive batching is a future optimization.
Motivation:
- Simplicity for initial implementation
- Predictable performance characteristics
- Easier to debug and test
- Can optimize later based on real usage patterns
4. Manual Log Cleanup (Phase 1)
Decision: No automatic log retention/cleanup in initial implementation.
Motivation:
- We're in early development phase
- Manual cleanup acceptable for now
- Avoids complexity in initial implementation
- Automatic retention can be added as future optimization
5. Unified Telemetry Stream
Decision: All JobLogEntry messages (logs, metrics, events) flow through the same JSONL files.
Motivation:
- Simplicity - single consumption pipeline
- Temporal consistency - metrics and logs naturally correlated
- Unified file format reduces complexity
6. Cardinality-Safe Prometheus Metrics
Decision: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion.
Motivation:
- Partition labels (date × customer × region × etc.) would create massive cardinality
- Focus on job-level and system-level metrics only
- Use job_id and job_type labels instead of partition-specific labels
7. Centralized Metric Templates for Correctness
Decision: Define all Prometheus metric names and label templates in a central location to avoid string duplication.
Motivation:
- Prevents implicit coupling via duplicated string templates
- Single source of truth for metric definitions
- Easier to maintain consistency across codebase
8. Limited Scope (Phase 1)
Decision: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration.
Motivation:
- Web app integration is part of a bigger update
- Allows focused implementation on core log consumption
- API-first approach enables multiple consumers
9. Unified Execution Paths
Decision: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations.
Motivation:
- Building with CLI then querying from service "just works"
- Single source of truth for all build artifacts
- Consistent behavior regardless of execution method
- Simplifies debugging and operational workflows
Interface Issues to Fix
JobLogEntry Protobuf Update Required
The current JobLogEntry definition needs updates:
Current (INCORRECT):
message JobLogEntry {
string partition_ref = 3; // Single string
// ...
}
Required (CORRECT):
message JobLogEntry {
repeated PartitionRef outputs = 3; // Multiple PartitionRef objects
// ...
}
Rationale: Jobs produce multiple partitions, and we should use the proper PartitionRef type for consistency with other interfaces.
Architecture
Storage Layout
/logs/databuild/
├── 2025-01-27/
│ ├── job_run_123abc.jsonl
│ ├── job_run_456def.jsonl
│ └── ...
├── 2025-01-28/
│ └── ...
File Format (JSONL)
Each file contains one JSON object per line, representing a JobLogEntry:
{"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}}
{"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}}
{"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}}
Consumption Pipeline
Job Wrapper (stdout) → Graph Log Collector → JSONL Files
↓
Unified Log Access Layer
↙ ↘
Service API CLI API
↓
Metrics Aggregator → /api/v1/metrics
Implementation Components
1. Log Collector [PHASE 1]
Responsibility: Consume job wrapper stdout and write to JSONL files.
struct LogCollector {
logs_dir: PathBuf, // /logs/databuild
active_files: HashMap<String, File>, // job_run_id -> file handle
}
impl LogCollector {
fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader<ChildStdout>) -> Result<()>;
fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>;
fn ensure_date_directory(&self) -> Result<PathBuf>;
}
2. Unified Log Access Layer [PHASE 1]
Responsibility: Provide common interface for reading logs from JSONL files, used by both service and CLI.
// Core log access implementation
struct LogReader {
logs_base_path: PathBuf,
}
impl LogReader {
fn get_job_logs(&self, request: &JobLogsRequest) -> Result<JobLogsResponse>;
fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result<Vec<String>>;
fn get_job_metrics(&self, job_run_id: &str) -> Result<Vec<MetricPoint>>;
}
Protobuf Interface (ensures CLI/Service consistency):
message JobLogsRequest {
string job_run_id = 1;
int64 since_timestamp = 2; // Unix timestamp (nanoseconds)
int32 min_level = 3; // LogLevel enum value
uint32 limit = 4;
}
message JobLogsResponse {
repeated JobLogEntry entries = 1;
bool has_more = 2;
}
3. Metrics Templates [PHASE 1]
Responsibility: Centralized metric definitions to avoid string duplication.
// Central location for all metric definitions
mod metric_templates {
pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds";
pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb";
pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds";
pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total";
pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap<String, String> {
let mut labels = HashMap::new();
labels.insert("job_run_id".to_string(), job_run_id.to_string());
labels.insert("job_type".to_string(), job_type.to_string());
labels
}
}
4. Metrics Aggregator [PHASE 1]
Responsibility: Process MetricPoint messages and expose Prometheus format with safe cardinality.
struct MetricsAggregator {
metrics: HashMap<String, MetricFamily>,
}
impl MetricsAggregator {
fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str);
fn generate_prometheus_output(&self) -> String;
}
Safe Prometheus Output (NO partition labels):
# HELP databuild_job_runtime_seconds Job execution time in seconds
# TYPE databuild_job_runtime_seconds gauge
databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2
# HELP databuild_rows_processed_total Total rows processed by job
# TYPE databuild_rows_processed_total counter
databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000
API Implementation
REST Endpoints [PHASE 1]
Get Job Logs:
GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level}
Response: Array of LogEntry objects with filtering support.
Prometheus Metrics Scraping:
GET /api/v1/metrics
Response: All metrics in Prometheus exposition format.
Configuration
Environment Variables
# Log storage configuration
DATABUILD_LOGS_DIR=/logs/databuild # Log storage directory
# Processing configuration
DATABUILD_LOG_REFRESH_INTERVAL_MS=1000 # Fixed refresh interval (1s)
DATABUILD_LOG_CACHE_SIZE=100 # LRU cache size for job logs
Implementation Phases
Phase 1: Core Implementation [COMPLETED ✅]
Goal: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics.
Deliverables ✅:
- ✅ Fix
JobLogEntryprotobuf interface (partition_ref → outputs) - ✅ LogCollector with JSONL file writing and graph-level job_label enrichment
- ✅ LogReader with unified protobuf interface for CLI/Service consistency
- ✅ REST API endpoints for job logs and Prometheus metrics
- ✅ MetricsAggregator with cardinality-safe output (job labels, not partition refs)
- ✅ Centralized metric templates module
Success Criteria ✅:
- ✅ Job logs are captured and stored reliably via LogCollector integration
- ✅ REST API can retrieve logs by job run ID and time range with filtering
- ✅ Prometheus metrics are exposed at
/api/v1/metricsendpoint without cardinality issues - ✅ System handles concurrent job execution without data corruption (separate JSONL files per job)
- ✅ All metric names/labels are defined in central location (
metric_templates.rs)
Phase 2: Advanced Features [FUTURE]
Goal: Performance optimizations and production features.
Deliverables:
- Adaptive batching based on system load
- Automatic log retention and cleanup
- Web app integration for log viewing
- Rate limiting for high-volume jobs
- Performance monitoring and alerting
Testing Strategy
Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1]
Unit Tests:
- JSONL parsing and serialization (basic happy path)
- Metrics aggregation and Prometheus formatting (template correctness)
- API endpoint responses (log retrieval by job_run_id)
Integration Tests:
- End-to-end: wrapper stdout → JSONL file → API response
- Concurrent job log collection (2-3 jobs simultaneously)
- Prometheus metrics scraping endpoint
Key Principle: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve.
Future Extensions
Performance Optimizations [FUTURE]
- Adaptive refresh intervals based on load
- Log compression for storage efficiency
- Advanced caching strategies
Production Features [FUTURE]
- Automatic log retention and cleanup policies
- Integration with external log collection tools
- Web app log viewing and search capabilities
Monitoring Integration [FUTURE]
- Grafana dashboard templates
- Alerting on log system health
- Performance metrics for log processing pipeline
Success Criteria
- Reliable Log Capture: All job wrapper output captured without loss
- API Functionality: Can retrieve logs by job run ID and time range
- Safe Metrics: Prometheus endpoint works without cardinality explosion
- Correctness: No duplicated metric templates, centralized definitions
- Concurrent Safety: Multiple jobs can write logs simultaneously without corruption
- Simple Testing: Test suite covers core functionality with minimal brittleness