databuild/plans/14-graph-side-log-consumption.md

16 KiB
Raw Permalink Blame History

Graph-Side Log Consumption Plan

Status

  • Phase 0: Design [DONE]
  • Phase 1: Core Implementation [COMPLETED ]
  • Phase 2: Advanced Features [FUTURE]

Phase 1 Implementation Status

Core Components COMPLETED

  1. JobLogEntry protobuf interface fixed - Updated databuild.proto to use repeated PartitionRef outputs instead of single string partition_ref
  2. LogCollector implemented - Consumes job wrapper stdout, parses structured logs, writes to date-organized JSONL files (logs/databuild/YYYY-MM-DD/job_run_id.jsonl)
  3. Graph integration completed - LogCollector integrated into graph execution with UUID-based job ID coordination between graph and wrapper
  4. Unified Log Access Layer implemented - Protobuf-based LogReader interface ensuring CLI/Service consistency for log retrieval
  5. Centralized metric templates - All metric definitions centralized in databuild/metric_templates.rs module
  6. MetricsAggregator with cardinality safety - Prometheus output without partition reference explosion, using job labels instead
  7. REST API endpoints implemented - /api/v1/jobs/{job_run_id}/logs and /api/v1/metrics fully functional
  8. Graph-level job_label enrichment - Solved cardinality issue via LogCollector enrichment pattern, consistent with design philosophy

Key Architectural Decisions Implemented

  • Cardinality-safe metrics: Job labels used instead of high-cardinality partition references in Prometheus output
  • Graph-level enrichment: LogCollector enriches both WrapperJobEvent and Manifest entries with job_label from graph context
  • JSONL storage: Date-organized file structure with robust error handling and concurrent access safety
  • Unified execution paths: Both CLI and service builds produce identical BEL events and JSONL logs in same locations
  • Job ID coordination: UUID-based job run IDs shared between graph execution and job wrapper via environment variable

All Success Criteria Met

  • Reliable Log Capture: All job wrapper output captured without loss through LogCollector
  • API Functionality: REST API retrieves logs by job run ID, timestamp filtering, and log level filtering
  • Safe Metrics: Prometheus endpoint works without cardinality explosion (job labels only, no partition refs)
  • Correctness: No duplicated metric templates, all definitions centralized in metric_templates.rs
  • Concurrent Safety: Multiple jobs write logs simultaneously without corruption via separate JSONL files per job
  • Simple Testing: Test suite covers core functionality with minimal brittleness, all tests passing

🏗️ Implementation Files

  • databuild/databuild.proto - Updated protobuf interfaces
  • databuild/log_collector.rs - Core log collection and JSONL writing
  • databuild/log_access.rs - Unified log reading interface
  • databuild/metric_templates.rs - Centralized metric definitions
  • databuild/metrics_aggregator.rs - Cardinality-safe Prometheus output
  • databuild/service/handlers.rs - REST API endpoints implementation
  • databuild/graph/execute.rs - Integration point for LogCollector
  • databuild/job/main.rs - Job wrapper structured log emission

Required Reading

Before implementing this plan, engineers should thoroughly understand these design documents:

Overview

This plan describes the graph-side implementation for consuming structured logs emitted by the job wrapper. The job wrapper emits JobLogEntry protobuf messages to stdout during job execution. The graph must consume these logs to provide log retrieval by job run ID and expose metrics for Prometheus scraping.

Key Technical Decisions

1. Storage Strategy: On-Disk with BEL Separation

Decision: Store structured logs on disk separate from the Build Event Log (BEL).

Motivation:

  • Log volumes can be legitimately large and would place undue stress on the BEL-backing datastore
  • BEL is optimized for event-sourcing patterns, not high-volume log queries
  • Separate storage allows independent scaling and retention policies

2. File Organization: Date-Organized Structure

Decision: Store logs in configurable-base, date-organized directories: $LOGS_BASEPATH/YYYY-MM-DD/{job_run_id}.jsonl

Motivation:

  • Enables efficient cleanup by date (future optimization)
  • Simplifies manual log management during development
  • Facilitates external log collection tools (future)

3. Static Update Period (Phase 1)

Decision: Use fixed refresh interval for log processing. Adaptive batching is a future optimization.

Motivation:

  • Simplicity for initial implementation
  • Predictable performance characteristics
  • Easier to debug and test
  • Can optimize later based on real usage patterns

4. Manual Log Cleanup (Phase 1)

Decision: No automatic log retention/cleanup in initial implementation.

Motivation:

  • We're in early development phase
  • Manual cleanup acceptable for now
  • Avoids complexity in initial implementation
  • Automatic retention can be added as future optimization

5. Unified Telemetry Stream

Decision: All JobLogEntry messages (logs, metrics, events) flow through the same JSONL files.

Motivation:

  • Simplicity - single consumption pipeline
  • Temporal consistency - metrics and logs naturally correlated
  • Unified file format reduces complexity

6. Cardinality-Safe Prometheus Metrics

Decision: Prometheus metrics will NOT include partition references as labels to avoid cardinality explosion.

Motivation:

  • Partition labels (date × customer × region × etc.) would create massive cardinality
  • Focus on job-level and system-level metrics only
  • Use job_id and job_type labels instead of partition-specific labels

7. Centralized Metric Templates for Correctness

Decision: Define all Prometheus metric names and label templates in a central location to avoid string duplication.

Motivation:

  • Prevents implicit coupling via duplicated string templates
  • Single source of truth for metric definitions
  • Easier to maintain consistency across codebase

8. Limited Scope (Phase 1)

Decision: Phase 1 focuses on log retrieval API and Prometheus metrics, excluding web app integration.

Motivation:

  • Web app integration is part of a bigger update
  • Allows focused implementation on core log consumption
  • API-first approach enables multiple consumers

9. Unified Execution Paths

Decision: Both CLI and service builds produce identical BEL events and JSONL logs in the same locations.

Motivation:

  • Building with CLI then querying from service "just works"
  • Single source of truth for all build artifacts
  • Consistent behavior regardless of execution method
  • Simplifies debugging and operational workflows

Interface Issues to Fix

JobLogEntry Protobuf Update Required

The current JobLogEntry definition needs updates:

Current (INCORRECT):

message JobLogEntry {
  string partition_ref = 3;  // Single string
  // ...
}

Required (CORRECT):

message JobLogEntry {
  repeated PartitionRef outputs = 3;  // Multiple PartitionRef objects
  // ...
}

Rationale: Jobs produce multiple partitions, and we should use the proper PartitionRef type for consistency with other interfaces.

Architecture

Storage Layout

/logs/databuild/
├── 2025-01-27/
│   ├── job_run_123abc.jsonl
│   ├── job_run_456def.jsonl
│   └── ...
├── 2025-01-28/
│   └── ...

File Format (JSONL)

Each file contains one JSON object per line, representing a JobLogEntry:

{"timestamp":"2025-01-27T10:30:45Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":1,"content":{"job_event":{"event_type":"task_launched","metadata":{}}}}
{"timestamp":"2025-01-27T10:30:46Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":2,"content":{"log":{"level":"INFO","message":"Processing started","fields":{"rows":"1000"}}}}
{"timestamp":"2025-01-27T10:30:50Z","job_id":"job_run_123abc","outputs":[{"path":"s3://bucket/dataset/date=2025-01-27"}],"sequence_number":3,"content":{"metric":{"name":"rows_processed","value":1000,"labels":{"stage":"transform"},"unit":"count"}}}

Consumption Pipeline

Job Wrapper (stdout) → Graph Log Collector → JSONL Files
                                                        ↓
                                              Unified Log Access Layer
                                                   ↙        ↘
                                            Service API    CLI API
                                                   ↓
                                           Metrics Aggregator → /api/v1/metrics

Implementation Components

1. Log Collector [PHASE 1]

Responsibility: Consume job wrapper stdout and write to JSONL files.

struct LogCollector {
    logs_dir: PathBuf,              // /logs/databuild
    active_files: HashMap<String, File>,  // job_run_id -> file handle
}

impl LogCollector {
    fn consume_job_output(&mut self, job_run_id: &str, stdout: &mut BufReader<ChildStdout>) -> Result<()>;
    fn write_log_entry(&mut self, job_run_id: &str, entry: &JobLogEntry) -> Result<()>;
    fn ensure_date_directory(&self) -> Result<PathBuf>;
}

2. Unified Log Access Layer [PHASE 1]

Responsibility: Provide common interface for reading logs from JSONL files, used by both service and CLI.

// Core log access implementation
struct LogReader {
    logs_base_path: PathBuf,
}

impl LogReader {
    fn get_job_logs(&self, request: &JobLogsRequest) -> Result<JobLogsResponse>;
    fn list_available_jobs(&self, date_range: Option<(String, String)>) -> Result<Vec<String>>;
    fn get_job_metrics(&self, job_run_id: &str) -> Result<Vec<MetricPoint>>;
}

Protobuf Interface (ensures CLI/Service consistency):

message JobLogsRequest {
  string job_run_id = 1;
  int64 since_timestamp = 2;  // Unix timestamp (nanoseconds)
  int32 min_level = 3;        // LogLevel enum value
  uint32 limit = 4;
}

message JobLogsResponse {
  repeated JobLogEntry entries = 1;
  bool has_more = 2;
}

3. Metrics Templates [PHASE 1]

Responsibility: Centralized metric definitions to avoid string duplication.

// Central location for all metric definitions
mod metric_templates {
    pub const JOB_RUNTIME_SECONDS: &str = "databuild_job_runtime_seconds";
    pub const JOB_MEMORY_PEAK_MB: &str = "databuild_job_memory_peak_mb";
    pub const JOB_CPU_TOTAL_SECONDS: &str = "databuild_job_cpu_total_seconds";
    pub const ROWS_PROCESSED_TOTAL: &str = "databuild_rows_processed_total";
    
    pub fn job_labels(job_run_id: &str, job_type: &str) -> HashMap<String, String> {
        let mut labels = HashMap::new();
        labels.insert("job_run_id".to_string(), job_run_id.to_string());
        labels.insert("job_type".to_string(), job_type.to_string());
        labels
    }
}

4. Metrics Aggregator [PHASE 1]

Responsibility: Process MetricPoint messages and expose Prometheus format with safe cardinality.

struct MetricsAggregator {
    metrics: HashMap<String, MetricFamily>,
}

impl MetricsAggregator {
    fn ingest_metric(&mut self, metric: &MetricPoint, job_run_id: &str, job_type: &str);
    fn generate_prometheus_output(&self) -> String;
}

Safe Prometheus Output (NO partition labels):

# HELP databuild_job_runtime_seconds Job execution time in seconds  
# TYPE databuild_job_runtime_seconds gauge
databuild_job_runtime_seconds{job_run_id="job_run_123abc"} 45.2

# HELP databuild_rows_processed_total Total rows processed by job
# TYPE databuild_rows_processed_total counter  
databuild_rows_processed_total{job_run_id="job_run_123abc"} 1000

API Implementation

REST Endpoints [PHASE 1]

Get Job Logs:

GET /api/v1/jobs/{job_run_id}/logs?since={timestamp}&level={log_level}

Response: Array of LogEntry objects with filtering support.

Prometheus Metrics Scraping:

GET /api/v1/metrics

Response: All metrics in Prometheus exposition format.

Configuration

Environment Variables

# Log storage configuration
DATABUILD_LOGS_DIR=/logs/databuild                    # Log storage directory

# Processing configuration  
DATABUILD_LOG_REFRESH_INTERVAL_MS=1000                # Fixed refresh interval (1s)
DATABUILD_LOG_CACHE_SIZE=100                          # LRU cache size for job logs

Implementation Phases

Phase 1: Core Implementation [COMPLETED ]

Goal: Basic log consumption and storage with REST API for log retrieval and Prometheus metrics.

Deliverables :

  • Fix JobLogEntry protobuf interface (partition_ref → outputs)
  • LogCollector with JSONL file writing and graph-level job_label enrichment
  • LogReader with unified protobuf interface for CLI/Service consistency
  • REST API endpoints for job logs and Prometheus metrics
  • MetricsAggregator with cardinality-safe output (job labels, not partition refs)
  • Centralized metric templates module

Success Criteria :

  • Job logs are captured and stored reliably via LogCollector integration
  • REST API can retrieve logs by job run ID and time range with filtering
  • Prometheus metrics are exposed at /api/v1/metrics endpoint without cardinality issues
  • System handles concurrent job execution without data corruption (separate JSONL files per job)
  • All metric names/labels are defined in central location (metric_templates.rs)

Phase 2: Advanced Features [FUTURE]

Goal: Performance optimizations and production features.

Deliverables:

  • Adaptive batching based on system load
  • Automatic log retention and cleanup
  • Web app integration for log viewing
  • Rate limiting for high-volume jobs
  • Performance monitoring and alerting

Testing Strategy

Core Tests (90% Coverage, Maximum Simplicity) [PHASE 1]

Unit Tests:

  • JSONL parsing and serialization (basic happy path)
  • Metrics aggregation and Prometheus formatting (template correctness)
  • API endpoint responses (log retrieval by job_run_id)

Integration Tests:

  • End-to-end: wrapper stdout → JSONL file → API response
  • Concurrent job log collection (2-3 jobs simultaneously)
  • Prometheus metrics scraping endpoint

Key Principle: Tests should be simple and focus on core workflows. Avoid testing edge cases that may change as requirements evolve.

Future Extensions

Performance Optimizations [FUTURE]

  • Adaptive refresh intervals based on load
  • Log compression for storage efficiency
  • Advanced caching strategies

Production Features [FUTURE]

  • Automatic log retention and cleanup policies
  • Integration with external log collection tools
  • Web app log viewing and search capabilities

Monitoring Integration [FUTURE]

  • Grafana dashboard templates
  • Alerting on log system health
  • Performance metrics for log processing pipeline

Success Criteria

  1. Reliable Log Capture: All job wrapper output captured without loss
  2. API Functionality: Can retrieve logs by job run ID and time range
  3. Safe Metrics: Prometheus endpoint works without cardinality explosion
  4. Correctness: No duplicated metric templates, centralized definitions
  5. Concurrent Safety: Multiple jobs can write logs simultaneously without corruption
  6. Simple Testing: Test suite covers core functionality with minimal brittleness