# Job Wrapper v2 Plan ## Status - Phase 0: Minimal Bootstrap [DONE] - Phase 1: Core Protocol [MOSTLY DONE - heartbeating and metrics implemented] - Phase 2: Platform Support [FUTURE] - Phase 3: Production Hardening [FUTURE] - Phase 4: Advanced Features [FUTURE] ## Required Reading Before implementing this plan, engineers should thoroughly understand these design documents: - **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model - **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines - **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements - **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration - **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures ## Overview The job wrapper is a critical component that mediates between DataBuild graphs and job executables, providing observability, error handling, and state management. This plan describes the next generation job wrapper implementation in Rust. ## Architecture ### Core Design Principles 1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs [DONE] 2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms [PARTIAL - local only] 3. **Zero Network Requirements**: Jobs don't need to connect to any services [DONE] 4. **Fail-Safe**: Graceful handling of job crashes and fast completions [PARTIAL - basic handling only] ### Communication Model ``` Graph → Job: Launch with JobConfig (via CLI args/env) Job → Graph: Structured logs (stdout) Graph: Tails logs and interprets into metrics, events, and manifests ``` ## Structured Log Protocol ### Message Format (Protobuf) [DONE] ```proto message JobLogEntry { string timestamp = 1; string job_id = 2; string partition_ref = 3; uint64 sequence_number = 4; // Monotonic sequence starting from 1 [DONE] oneof content { LogMessage log = 5; // [DONE] MetricPoint metric = 6; // [FUTURE] JobEvent event = 7; // [DONE - WrapperJobEvent] PartitionManifest manifest = 8; // [DONE] } } message LogMessage { // [DONE] enum LogLevel { DEBUG = 0; INFO = 1; WARN = 2; ERROR = 3; } LogLevel level = 1; string message = 2; map fields = 3; } message MetricPoint { // [FUTURE] string name = 1; double value = 2; map labels = 3; string unit = 4; } message JobEvent { // [DONE - as WrapperJobEvent] string event_type = 1; // "task_launched", "heartbeat", "task_completed", etc google.protobuf.Any details = 2; map metadata = 3; } ``` ### Log Stream Lifecycle 1. Wrapper emits `config_validate_success` event (sequence #1) [DONE] 2. Wrapper validates configuration [DONE] 3. Wrapper emits `task_launch_success` event (sequence #2) [DONE] 4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE] 5. Wrapper emits periodic `heartbeat` events (every 30s) [DONE] 6. Wrapper detects job completion [DONE] 7. Wrapper emits `task_success`/`task_failed` event [DONE] 8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE] 9. Wrapper exits [DONE] The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection. [DONE - sequence numbers implemented] ## Wrapper Implementation [PARTIAL] ### Interfaces [DONE] ```rust trait JobWrapper { // Config mode - accepts PartitionRef objects fn config(outputs: Vec) -> Result; // [DONE] // Exec mode - accepts serialized JobConfig fn exec(config: JobConfig) -> Result<()>; // [DONE] } ``` ### Exit Code Standards [PARTIAL] Following POSIX conventions and avoiding collisions with standard exit codes: Reference: - https://manpages.ubuntu.com/manpages/noble/man3/sysexits.h.3head.html - https://tldp.org/LDP/abs/html/exitcodes.html ```rust // Standard POSIX codes we respect: [PARTIAL - basic forwarding only] // 0 - Success // 1 - General error // 2 - Misuse of shell builtin // 64 - Command line usage error (EX_USAGE) // 65 - Data format error (EX_DATAERR) // 66 - Cannot open input (EX_NOINPUT) // 69 - Service unavailable (EX_UNAVAILABLE) // 70 - Internal software error (EX_SOFTWARE) // 71 - System error (EX_OSERR) // 73 - Can't create output file (EX_CANTCREAT) // 74 - Input/output error (EX_IOERR) // 75 - Temp failure; retry (EX_TEMPFAIL) // 77 - Permission denied (EX_NOPERM) // 78 - Configuration error (EX_CONFIG) // DataBuild-specific codes (100+ to avoid collisions): [FUTURE] // 100-109 - User-defined permanent failures // 110-119 - User-defined transient failures // 120-129 - User-defined resource failures // 130+ - Other user-defined codes enum ExitCodeCategory { // [FUTURE] Success, // 0 StandardError, // 1-63 (shell/system) PosixError, // 64-78 (sysexits.h) TransientFailure, // 75 (EX_TEMPFAIL) or 110-119 UserDefined, // 100+ } ``` ## Platform-Specific Log Handling ### Local Execution [DONE] - Graph spawns wrapper process [DONE] - Graph reads from stdout pipe directly [DONE] - PartitionManifest indicates completion [DONE] ### Docker [FUTURE] - Graph runs `docker run` with wrapper as entrypoint - Graph uses `docker logs -f` to tail output - Logs persist after container exit ### Kubernetes [FUTURE] - Job pods use wrapper as container entrypoint - Graph tails logs via K8s API - Configure `terminationGracePeriodSeconds` for log retention ### Cloud Run / Lambda [FUTURE] - Wrapper logs to platform logging service - Graph queries logs via platform API - Natural buffering and persistence ## Observability Features ### Metrics Collection [FUTURE] For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format: ```json { "timestamp": "2025-01-27T10:30:45Z", "content": { "metric": { "name": "rows_processed", "value": 1500000, "labels": { "partition": "date=2025-01-27", "stage": "transform" }, "unit": "count" } } } ``` The graph component will: [FUTURE] - Aggregate metrics from job logs - Expose them in Prometheus format for scraping (when running as a service) - Store summary metrics in the BEL for historical analysis For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs). ### Heartbeating [DONE] Fixed 30-second heartbeat interval (configurable via `DATABUILD_HEARTBEAT_INTERVAL_MS`): ```json { "timestamp": "2025-01-27T10:30:45Z", "content": { "JobEvent": { "event_type": "heartbeat", "metadata": { "memory_usage_mb": "1024.256", "cpu_usage_percent": "85.200" } } } } ``` **Implementation Details:** - Uses sysinfo crate for cross-platform process monitoring - Heartbeat thread communicates via channels with main thread - Includes memory usage (MB) and CPU usage (%) with 3 decimal precision - Configurable interval for testing (default 30s, test environments use 100ms) - Proper dependency injection via LogSink trait for testability ### Log Bandwidth Limits [FUTURE] To prevent log flooding: - Maximum log rate: 1000 messages/second - Maximum message size: 1MB - If limits exceeded: Wrapper emits rate limit warning and drops messages - Final metrics show dropped message count ## Testing Strategy ### Unit Tests [MOSTLY DONE] - Log parsing and serialization [DONE - protobuf serde] - State machine transitions [DONE - JobLogEntry sequence validation] - Heartbeat functionality [DONE - with dependency injection] - CPU/memory metrics collection [DONE - with configurable intervals] - Exit code categorization [FUTURE] - Rate limiting behavior [FUTURE] ### Integration Tests [PARTIAL] - Full job execution lifecycle [DONE - via e2e tests] - Resource metrics validation [DONE - CPU-intensive workload testing] - Platform-specific log tailing [PARTIAL - local only] - Fast job completion handling [DONE] - Large log volume handling [FUTURE] ### Platform Tests [PARTIAL] - Local process execution [DONE] - Docker container runs [FUTURE] - Kubernetes job pods [FUTURE] - Cloud Run invocations [FUTURE] ### Failure Scenario Tests [PARTIAL] - Job crashes (SIGSEGV, SIGKILL) [DONE - basic exit code forwarding] - Wrapper crashes [FUTURE] - Log tailing interruptions [FUTURE] - Platform-specific failures [FUTURE] ## Implementation Phases ### Phase 0: Minimal Bootstrap [DONE] Implement the absolute minimum to unblock development and testing: - Basic wrapper that only handles happy path [DONE] - Support for local execution only [DONE] - Minimal log parsing in graph [DONE - wrapper emits structured logs] - Integration with existing example jobs [DONE - e2e tests passing] This phase delivers a working end-to-end system that can be continuously evolved. [DONE] **Completed Implementation Details:** - Created databuild/job/src/main.rs with config/exec modes [DONE] - Uses protobuf types from databuild.proto [DONE] - Emits JobLogEntry with sequence numbers [DONE] - Follows core-build.md state diagram exactly [DONE] - Forwards job stdout/stderr as LogMessage entries [DONE] - Emits PartitionManifest on successful completion [DONE] - Properly handles job failures with exit codes [DONE] - Modified Bazel rules to use job_wrapper [DONE] - All e2e tests passing [DONE] ### Phase 1: Core Protocol [MOSTLY DONE] - Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent] - Implement structured logger [DONE - JSON serialization to stdout] - Add error handling and exit codes [PARTIAL - basic forwarding only] - Implement heartbeating [DONE - with CPU/memory metrics] - Resource metrics collection [DONE - CPU time, peak memory, runtime] - Dependency injection for testability [DONE - LogSink trait pattern] - Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume] - MetricPoint message support [FUTURE] - Advanced error categorization [FUTURE] ### Phase 2: Platform Support [FUTURE] - Docker integration [FUTURE] - Kubernetes support [FUTURE] - Cloud platform adapters [FUTURE] - Platform-specific testing [FUTURE] ### Phase 3: Production Hardening [FUTURE] - Rate limiting [FUTURE] - Error recovery [FUTURE] - Performance optimization [FUTURE] - Monitoring integration [FUTURE] ### Phase 4: Advanced Features [FUTURE] - In-process config for library jobs [FUTURE] - Custom metrics backends [FUTURE] - Advanced failure analysis [FUTURE] ## Success Criteria 1. **Zero Network Dependencies**: Jobs run without any network access [DONE] 2. **Platform Parity**: Identical behavior across all execution platforms [PARTIAL - local only] 3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec [DONE - fast execution] 4. **Complete Observability**: All job state changes captured in logs [DONE - core events captured] 5. **Graceful Failures**: No log data loss even in crash scenarios [PARTIAL - basic failure handling] ## Next Steps 1. Implement minimal bootstrap wrapper [DONE] 2. Test with existing example jobs [DONE] 3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation] 4. Gradually add features per implementation phases [IN PROGRESS] **Phase 1 Achievements:** - ✅ Heartbeating support with CPU/memory metrics [DONE] - ✅ Dependency injection for testability (LogSink trait) [DONE] - ✅ Resource metrics collection (CPU time, peak memory, runtime) [DONE] - ✅ Comprehensive test coverage for heartbeats and metrics [DONE] - ✅ Configurable intervals for different environments [DONE] **Remaining for Phase 1 Completion:** - Implement MetricPoint logging [FUTURE] - Add graph-side structured log consumption [FUTURE] - Enhanced error categorization and exit code mapping [FUTURE] **Recent Implementation Details:** - Uses sysinfo 0.30 for cross-platform process monitoring - Thread-safe heartbeat communication via mpsc channels - Floating-point metrics with 3 decimal precision (f64) - Environment variable configuration: `DATABUILD_HEARTBEAT_INTERVAL_MS`, `DATABUILD_METRICS_INTERVAL_MS` - Robust test infrastructure with synthetic CPU-intensive workloads - Proper CPU time calculation: (average_cpu_percent / 100.0) × wall_clock_time