12 KiB
Job Wrapper v2 Plan
Status
- Phase 0: Minimal Bootstrap [DONE]
- Phase 1: Core Protocol [MOSTLY DONE - heartbeating and metrics implemented]
- Phase 2: Platform Support [FUTURE]
- Phase 3: Production Hardening [FUTURE]
- Phase 4: Advanced Features [FUTURE]
Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
- DESIGN.md - Overall DataBuild architecture and job execution model
- design/core-build.md - Core build semantics and job lifecycle state machines
- design/observability.md - Observability strategy and telemetry requirements
- design/build-event-log.md - Event sourcing model and BEL integration
- databuild.proto - System interfaces and data structures
Overview
The job wrapper is a critical component that mediates between DataBuild graphs and job executables, providing observability, error handling, and state management. This plan describes the next generation job wrapper implementation in Rust.
Architecture
Core Design Principles
- Single Communication Channel: Jobs communicate with graphs exclusively through structured logs [DONE]
- Platform Agnostic: Works identically across local, Docker, K8s, and cloud platforms [PARTIAL - local only]
- Zero Network Requirements: Jobs don't need to connect to any services [DONE]
- Fail-Safe: Graceful handling of job crashes and fast completions [PARTIAL - basic handling only]
Communication Model
Graph → Job: Launch with JobConfig (via CLI args/env)
Job → Graph: Structured logs (stdout)
Graph: Tails logs and interprets into metrics, events, and manifests
Structured Log Protocol
Message Format (Protobuf) [DONE]
message JobLogEntry {
string timestamp = 1;
string job_id = 2;
string partition_ref = 3;
uint64 sequence_number = 4; // Monotonic sequence starting from 1 [DONE]
oneof content {
LogMessage log = 5; // [DONE]
MetricPoint metric = 6; // [FUTURE]
JobEvent event = 7; // [DONE - WrapperJobEvent]
PartitionManifest manifest = 8; // [DONE]
}
}
message LogMessage { // [DONE]
enum LogLevel {
DEBUG = 0;
INFO = 1;
WARN = 2;
ERROR = 3;
}
LogLevel level = 1;
string message = 2;
map<string, string> fields = 3;
}
message MetricPoint { // [FUTURE]
string name = 1;
double value = 2;
map<string, string> labels = 3;
string unit = 4;
}
message JobEvent { // [DONE - as WrapperJobEvent]
string event_type = 1; // "task_launched", "heartbeat", "task_completed", etc
google.protobuf.Any details = 2;
map<string, string> metadata = 3;
}
Log Stream Lifecycle
- Wrapper emits
config_validate_successevent (sequence #1) [DONE] - Wrapper validates configuration [DONE]
- Wrapper emits
task_launch_successevent (sequence #2) [DONE] - Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE]
- Wrapper emits periodic
heartbeatevents (every 30s) [DONE] - Wrapper detects job completion [DONE]
- Wrapper emits
task_success/task_failedevent [DONE] - Wrapper emits
PartitionManifestmessage (final required message with highest sequence number) [DONE] - Wrapper exits [DONE]
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection. [DONE - sequence numbers implemented]
Wrapper Implementation [PARTIAL]
Interfaces [DONE]
trait JobWrapper {
// Config mode - accepts PartitionRef objects
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>; // [DONE]
// Exec mode - accepts serialized JobConfig
fn exec(config: JobConfig) -> Result<()>; // [DONE]
}
Exit Code Standards [PARTIAL]
Following POSIX conventions and avoiding collisions with standard exit codes:
Reference:
- https://manpages.ubuntu.com/manpages/noble/man3/sysexits.h.3head.html
- https://tldp.org/LDP/abs/html/exitcodes.html
// Standard POSIX codes we respect: [PARTIAL - basic forwarding only]
// 0 - Success
// 1 - General error
// 2 - Misuse of shell builtin
// 64 - Command line usage error (EX_USAGE)
// 65 - Data format error (EX_DATAERR)
// 66 - Cannot open input (EX_NOINPUT)
// 69 - Service unavailable (EX_UNAVAILABLE)
// 70 - Internal software error (EX_SOFTWARE)
// 71 - System error (EX_OSERR)
// 73 - Can't create output file (EX_CANTCREAT)
// 74 - Input/output error (EX_IOERR)
// 75 - Temp failure; retry (EX_TEMPFAIL)
// 77 - Permission denied (EX_NOPERM)
// 78 - Configuration error (EX_CONFIG)
// DataBuild-specific codes (100+ to avoid collisions): [FUTURE]
// 100-109 - User-defined permanent failures
// 110-119 - User-defined transient failures
// 120-129 - User-defined resource failures
// 130+ - Other user-defined codes
enum ExitCodeCategory { // [FUTURE]
Success, // 0
StandardError, // 1-63 (shell/system)
PosixError, // 64-78 (sysexits.h)
TransientFailure, // 75 (EX_TEMPFAIL) or 110-119
UserDefined, // 100+
}
Platform-Specific Log Handling
Local Execution [DONE]
- Graph spawns wrapper process [DONE]
- Graph reads from stdout pipe directly [DONE]
- PartitionManifest indicates completion [DONE]
Docker [FUTURE]
- Graph runs
docker runwith wrapper as entrypoint - Graph uses
docker logs -fto tail output - Logs persist after container exit
Kubernetes [FUTURE]
- Job pods use wrapper as container entrypoint
- Graph tails logs via K8s API
- Configure
terminationGracePeriodSecondsfor log retention
Cloud Run / Lambda [FUTURE]
- Wrapper logs to platform logging service
- Graph queries logs via platform API
- Natural buffering and persistence
Observability Features
Metrics Collection [FUTURE]
For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format:
{
"timestamp": "2025-01-27T10:30:45Z",
"content": {
"metric": {
"name": "rows_processed",
"value": 1500000,
"labels": {
"partition": "date=2025-01-27",
"stage": "transform"
},
"unit": "count"
}
}
}
The graph component will: [FUTURE]
- Aggregate metrics from job logs
- Expose them in Prometheus format for scraping (when running as a service)
- Store summary metrics in the BEL for historical analysis
For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).
Heartbeating [DONE]
Fixed 30-second heartbeat interval (configurable via DATABUILD_HEARTBEAT_INTERVAL_MS):
{
"timestamp": "2025-01-27T10:30:45Z",
"content": {
"JobEvent": {
"event_type": "heartbeat",
"metadata": {
"memory_usage_mb": "1024.256",
"cpu_usage_percent": "85.200"
}
}
}
}
Implementation Details:
- Uses sysinfo crate for cross-platform process monitoring
- Heartbeat thread communicates via channels with main thread
- Includes memory usage (MB) and CPU usage (%) with 3 decimal precision
- Configurable interval for testing (default 30s, test environments use 100ms)
- Proper dependency injection via LogSink trait for testability
Log Bandwidth Limits [FUTURE]
To prevent log flooding:
- Maximum log rate: 1000 messages/second
- Maximum message size: 1MB
- If limits exceeded: Wrapper emits rate limit warning and drops messages
- Final metrics show dropped message count
Testing Strategy
Unit Tests [MOSTLY DONE]
- Log parsing and serialization [DONE - protobuf serde]
- State machine transitions [DONE - JobLogEntry sequence validation]
- Heartbeat functionality [DONE - with dependency injection]
- CPU/memory metrics collection [DONE - with configurable intervals]
- Exit code categorization [FUTURE]
- Rate limiting behavior [FUTURE]
Integration Tests [PARTIAL]
- Full job execution lifecycle [DONE - via e2e tests]
- Resource metrics validation [DONE - CPU-intensive workload testing]
- Platform-specific log tailing [PARTIAL - local only]
- Fast job completion handling [DONE]
- Large log volume handling [FUTURE]
Platform Tests [PARTIAL]
- Local process execution [DONE]
- Docker container runs [FUTURE]
- Kubernetes job pods [FUTURE]
- Cloud Run invocations [FUTURE]
Failure Scenario Tests [PARTIAL]
- Job crashes (SIGSEGV, SIGKILL) [DONE - basic exit code forwarding]
- Wrapper crashes [FUTURE]
- Log tailing interruptions [FUTURE]
- Platform-specific failures [FUTURE]
Implementation Phases
Phase 0: Minimal Bootstrap [DONE]
Implement the absolute minimum to unblock development and testing:
- Basic wrapper that only handles happy path [DONE]
- Support for local execution only [DONE]
- Minimal log parsing in graph [DONE - wrapper emits structured logs]
- Integration with existing example jobs [DONE - e2e tests passing]
This phase delivers a working end-to-end system that can be continuously evolved. [DONE]
Completed Implementation Details:
- Created databuild/job/src/main.rs with config/exec modes [DONE]
- Uses protobuf types from databuild.proto [DONE]
- Emits JobLogEntry with sequence numbers [DONE]
- Follows core-build.md state diagram exactly [DONE]
- Forwards job stdout/stderr as LogMessage entries [DONE]
- Emits PartitionManifest on successful completion [DONE]
- Properly handles job failures with exit codes [DONE]
- Modified Bazel rules to use job_wrapper [DONE]
- All e2e tests passing [DONE]
Phase 1: Core Protocol [MOSTLY DONE]
- Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent]
- Implement structured logger [DONE - JSON serialization to stdout]
- Add error handling and exit codes [PARTIAL - basic forwarding only]
- Implement heartbeating [DONE - with CPU/memory metrics]
- Resource metrics collection [DONE - CPU time, peak memory, runtime]
- Dependency injection for testability [DONE - LogSink trait pattern]
- Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume]
- MetricPoint message support [FUTURE]
- Advanced error categorization [FUTURE]
Phase 2: Platform Support [FUTURE]
- Docker integration [FUTURE]
- Kubernetes support [FUTURE]
- Cloud platform adapters [FUTURE]
- Platform-specific testing [FUTURE]
Phase 3: Production Hardening [FUTURE]
- Rate limiting [FUTURE]
- Error recovery [FUTURE]
- Performance optimization [FUTURE]
- Monitoring integration [FUTURE]
Phase 4: Advanced Features [FUTURE]
- In-process config for library jobs [FUTURE]
- Custom metrics backends [FUTURE]
- Advanced failure analysis [FUTURE]
Success Criteria
- Zero Network Dependencies: Jobs run without any network access [DONE]
- Platform Parity: Identical behavior across all execution platforms [PARTIAL - local only]
- Minimal Overhead: < 100ms wrapper overhead for config, < 1s for exec [DONE - fast execution]
- Complete Observability: All job state changes captured in logs [DONE - core events captured]
- Graceful Failures: No log data loss even in crash scenarios [PARTIAL - basic failure handling]
Next Steps
- Implement minimal bootstrap wrapper [DONE]
- Test with existing example jobs [DONE]
- Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation]
- Gradually add features per implementation phases [IN PROGRESS]
Phase 1 Achievements:
- ✅ Heartbeating support with CPU/memory metrics [DONE]
- ✅ Dependency injection for testability (LogSink trait) [DONE]
- ✅ Resource metrics collection (CPU time, peak memory, runtime) [DONE]
- ✅ Comprehensive test coverage for heartbeats and metrics [DONE]
- ✅ Configurable intervals for different environments [DONE]
Remaining for Phase 1 Completion:
- Implement MetricPoint logging [FUTURE]
- Add graph-side structured log consumption [FUTURE]
- Enhanced error categorization and exit code mapping [FUTURE]
Recent Implementation Details:
- Uses sysinfo 0.30 for cross-platform process monitoring
- Thread-safe heartbeat communication via mpsc channels
- Floating-point metrics with 3 decimal precision (f64)
- Environment variable configuration:
DATABUILD_HEARTBEAT_INTERVAL_MS,DATABUILD_METRICS_INTERVAL_MS - Robust test infrastructure with synthetic CPU-intensive workloads
- Proper CPU time calculation: (average_cpu_percent / 100.0) × wall_clock_time