databuild/plans/job-wrapper.md

7.9 KiB

Job Wrapper v2 Plan

Overview

The job wrapper is a critical component that mediates between DataBuild graphs and job executables, providing observability, error handling, and state management. This plan describes the next generation job wrapper implementation in Rust.

Architecture

Core Design Principles

  1. Single Communication Channel: Jobs communicate with graphs exclusively through structured logs
  2. Platform Agnostic: Works identically across local, Docker, K8s, and cloud platforms
  3. Zero Network Requirements: Jobs don't need to connect to any services
  4. Fail-Safe: Graceful handling of job crashes and fast completions

Communication Model

Graph → Job: Launch with JobConfig (via CLI args/env)
Job → Graph: Structured logs (stdout)
Graph: Tails logs and interprets into metrics, events, and manifests

Structured Log Protocol

Message Format (Protobuf)

message JobLogEntry {
  string timestamp = 1;
  string job_id = 2;
  string partition_ref = 3;
  uint64 sequence_number = 4;  // Monotonic sequence starting from 1
  
  oneof content {
    LogMessage log = 5;
    MetricPoint metric = 6;
    JobEvent event = 7;
    PartitionManifest manifest = 8;
  }
}

message LogMessage {
  enum LogLevel {
    DEBUG = 0;
    INFO = 1;
    WARN = 2;
    ERROR = 3;
  }
  LogLevel level = 1;
  string message = 2;
  map<string, string> fields = 3;
}

message MetricPoint {
  string name = 1;
  double value = 2;
  map<string, string> labels = 3;
  string unit = 4;
}

message JobEvent {
  string event_type = 1;  // "task_launched", "heartbeat", "task_completed", etc
  google.protobuf.Any details = 2;
  map<string, string> metadata = 3;
}

Log Stream Lifecycle

  1. Wrapper emits job_config_started event (sequence #1)
  2. Wrapper validates configuration
  3. Wrapper emits task_launched event (sequence #2)
  4. Job executes, wrapper captures stdout/stderr (sequence #3+)
  5. Wrapper emits periodic heartbeat events (every 30s)
  6. Wrapper detects job completion
  7. Wrapper emits PartitionManifest message (final required message with highest sequence number)
  8. Wrapper exits

The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection.

Wrapper Implementation

Interfaces

trait JobWrapper {
    // Config mode - accepts PartitionRef objects
    fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>;
    
    // Exec mode - accepts serialized JobConfig
    fn exec(config: JobConfig) -> Result<()>;
}

Exit Code Standards

Following POSIX conventions and avoiding collisions with standard exit codes:

Reference:

// Standard POSIX codes we respect:
// 0   - Success
// 1   - General error
// 2   - Misuse of shell builtin
// 64  - Command line usage error (EX_USAGE)
// 65  - Data format error (EX_DATAERR)
// 66  - Cannot open input (EX_NOINPUT)
// 69  - Service unavailable (EX_UNAVAILABLE)
// 70  - Internal software error (EX_SOFTWARE)
// 71  - System error (EX_OSERR)
// 73  - Can't create output file (EX_CANTCREAT)
// 74  - Input/output error (EX_IOERR)
// 75  - Temp failure; retry (EX_TEMPFAIL)
// 77  - Permission denied (EX_NOPERM)
// 78  - Configuration error (EX_CONFIG)

// DataBuild-specific codes (100+ to avoid collisions):
// 100-109 - User-defined permanent failures
// 110-119 - User-defined transient failures  
// 120-129 - User-defined resource failures
// 130+    - Other user-defined codes

enum ExitCodeCategory {
    Success,              // 0
    StandardError,        // 1-63 (shell/system)
    PosixError,          // 64-78 (sysexits.h)
    TransientFailure,    // 75 (EX_TEMPFAIL) or 110-119
    UserDefined,         // 100+
}

Platform-Specific Log Handling

Local Execution

  • Graph spawns wrapper process
  • Graph reads from stdout pipe directly
  • PartitionManifest indicates completion

Docker

  • Graph runs docker run with wrapper as entrypoint
  • Graph uses docker logs -f to tail output
  • Logs persist after container exit

Kubernetes

  • Job pods use wrapper as container entrypoint
  • Graph tails logs via K8s API
  • Configure terminationGracePeriodSeconds for log retention

Cloud Run / Lambda

  • Wrapper logs to platform logging service
  • Graph queries logs via platform API
  • Natural buffering and persistence

Observability Features

Metrics Collection

For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format:

{
  "timestamp": "2025-01-27T10:30:45Z",
  "content": {
    "metric": {
      "name": "rows_processed",
      "value": 1500000,
      "labels": {
        "partition": "date=2025-01-27",
        "stage": "transform"
      },
      "unit": "count"
    }
  }
}

The graph component will:

  • Aggregate metrics from job logs
  • Expose them in Prometheus format for scraping (when running as a service)
  • Store summary metrics in the BEL for historical analysis

For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).

Heartbeating

Fixed 30-second heartbeat interval (based on Kubernetes best practices):

{
  "timestamp": "2025-01-27T10:30:45Z", 
  "content": {
    "event": {
      "event_type": "heartbeat",
      "metadata": {
        "memory_usage_mb": "1024",
        "cpu_usage_percent": "85.2"
      }
    }
  }
}

Log Bandwidth Limits

To prevent log flooding:

  • Maximum log rate: 1000 messages/second
  • Maximum message size: 1MB
  • If limits exceeded: Wrapper emits rate limit warning and drops messages
  • Final metrics show dropped message count

Testing Strategy

Unit Tests

  • Log parsing and serialization
  • Exit code categorization
  • Rate limiting behavior
  • State machine transitions

Integration Tests

  • Full job execution lifecycle
  • Platform-specific log tailing
  • Fast job completion handling
  • Large log volume handling

Platform Tests

  • Local process execution
  • Docker container runs
  • Kubernetes job pods
  • Cloud Run invocations

Failure Scenario Tests

  • Job crashes (SIGSEGV, SIGKILL)
  • Wrapper crashes
  • Log tailing interruptions
  • Platform-specific failures

Implementation Phases

Phase 0: Minimal Bootstrap

Implement the absolute minimum to unblock development and testing:

  • Simple JSON-based logging (no protobuf yet)
  • Basic wrapper that only handles happy path
  • Support for local execution only
  • Minimal log parsing in graph
  • Integration with existing example jobs

This phase delivers a working end-to-end system that can be continuously evolved.

Phase 1: Core Protocol

  • Define protobuf schemas
  • Implement structured logger
  • Add error handling and exit codes
  • Implement heartbeating
  • Graph-side log parser improvements

Phase 2: Platform Support

  • Docker integration
  • Kubernetes support
  • Cloud platform adapters
  • Platform-specific testing

Phase 3: Production Hardening

  • Rate limiting
  • Error recovery
  • Performance optimization
  • Monitoring integration

Phase 4: Advanced Features

  • In-process config for library jobs
  • Custom metrics backends
  • Advanced failure analysis

Success Criteria

  1. Zero Network Dependencies: Jobs run without any network access
  2. Platform Parity: Identical behavior across all execution platforms
  3. Minimal Overhead: < 100ms wrapper overhead for config, < 1s for exec
  4. Complete Observability: All job state changes captured in logs
  5. Graceful Failures: No log data loss even in crash scenarios

Next Steps

  1. Implement minimal bootstrap wrapper
  2. Test with existing example jobs
  3. Iterate on log format based on real usage
  4. Gradually add features per implementation phases