346 lines
No EOL
12 KiB
Markdown
346 lines
No EOL
12 KiB
Markdown
# Job Wrapper v2 Plan
|
||
|
||
## Status
|
||
- Phase 0: Minimal Bootstrap [DONE]
|
||
- Phase 1: Core Protocol [MOSTLY DONE - heartbeating and metrics implemented]
|
||
- Phase 2: Platform Support [FUTURE]
|
||
- Phase 3: Production Hardening [FUTURE]
|
||
- Phase 4: Advanced Features [FUTURE]
|
||
|
||
## Required Reading
|
||
|
||
Before implementing this plan, engineers should thoroughly understand these design documents:
|
||
|
||
- **[DESIGN.md](../DESIGN.md)** - Overall DataBuild architecture and job execution model
|
||
- **[design/core-build.md](../design/core-build.md)** - Core build semantics and job lifecycle state machines
|
||
- **[design/observability.md](../design/observability.md)** - Observability strategy and telemetry requirements
|
||
- **[design/build-event-log.md](../design/build-event-log.md)** - Event sourcing model and BEL integration
|
||
- **[databuild.proto](../databuild/databuild.proto)** - System interfaces and data structures
|
||
|
||
## Overview
|
||
The job wrapper is a critical component that mediates between DataBuild graphs and job executables, providing observability, error handling, and state management. This plan describes the next generation job wrapper implementation in Rust.
|
||
|
||
## Architecture
|
||
|
||
### Core Design Principles
|
||
1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs [DONE]
|
||
2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms [PARTIAL - local only]
|
||
3. **Zero Network Requirements**: Jobs don't need to connect to any services [DONE]
|
||
4. **Fail-Safe**: Graceful handling of job crashes and fast completions [PARTIAL - basic handling only]
|
||
|
||
### Communication Model
|
||
```
|
||
Graph → Job: Launch with JobConfig (via CLI args/env)
|
||
Job → Graph: Structured logs (stdout)
|
||
Graph: Tails logs and interprets into metrics, events, and manifests
|
||
```
|
||
|
||
## Structured Log Protocol
|
||
|
||
### Message Format (Protobuf) [DONE]
|
||
```proto
|
||
message JobLogEntry {
|
||
string timestamp = 1;
|
||
string job_id = 2;
|
||
string partition_ref = 3;
|
||
uint64 sequence_number = 4; // Monotonic sequence starting from 1 [DONE]
|
||
|
||
oneof content {
|
||
LogMessage log = 5; // [DONE]
|
||
MetricPoint metric = 6; // [FUTURE]
|
||
JobEvent event = 7; // [DONE - WrapperJobEvent]
|
||
PartitionManifest manifest = 8; // [DONE]
|
||
}
|
||
}
|
||
|
||
message LogMessage { // [DONE]
|
||
enum LogLevel {
|
||
DEBUG = 0;
|
||
INFO = 1;
|
||
WARN = 2;
|
||
ERROR = 3;
|
||
}
|
||
LogLevel level = 1;
|
||
string message = 2;
|
||
map<string, string> fields = 3;
|
||
}
|
||
|
||
message MetricPoint { // [FUTURE]
|
||
string name = 1;
|
||
double value = 2;
|
||
map<string, string> labels = 3;
|
||
string unit = 4;
|
||
}
|
||
|
||
message JobEvent { // [DONE - as WrapperJobEvent]
|
||
string event_type = 1; // "task_launched", "heartbeat", "task_completed", etc
|
||
google.protobuf.Any details = 2;
|
||
map<string, string> metadata = 3;
|
||
}
|
||
```
|
||
|
||
### Log Stream Lifecycle
|
||
1. Wrapper emits `config_validate_success` event (sequence #1) [DONE]
|
||
2. Wrapper validates configuration [DONE]
|
||
3. Wrapper emits `task_launch_success` event (sequence #2) [DONE]
|
||
4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE]
|
||
5. Wrapper emits periodic `heartbeat` events (every 30s) [DONE]
|
||
6. Wrapper detects job completion [DONE]
|
||
7. Wrapper emits `task_success`/`task_failed` event [DONE]
|
||
8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE]
|
||
9. Wrapper exits [DONE]
|
||
|
||
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection. [DONE - sequence numbers implemented]
|
||
|
||
## Wrapper Implementation [PARTIAL]
|
||
|
||
### Interfaces [DONE]
|
||
```rust
|
||
trait JobWrapper {
|
||
// Config mode - accepts PartitionRef objects
|
||
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>; // [DONE]
|
||
|
||
// Exec mode - accepts serialized JobConfig
|
||
fn exec(config: JobConfig) -> Result<()>; // [DONE]
|
||
}
|
||
```
|
||
|
||
### Exit Code Standards [PARTIAL]
|
||
|
||
Following POSIX conventions and avoiding collisions with standard exit codes:
|
||
|
||
Reference:
|
||
- https://manpages.ubuntu.com/manpages/noble/man3/sysexits.h.3head.html
|
||
- https://tldp.org/LDP/abs/html/exitcodes.html
|
||
|
||
```rust
|
||
// Standard POSIX codes we respect: [PARTIAL - basic forwarding only]
|
||
// 0 - Success
|
||
// 1 - General error
|
||
// 2 - Misuse of shell builtin
|
||
// 64 - Command line usage error (EX_USAGE)
|
||
// 65 - Data format error (EX_DATAERR)
|
||
// 66 - Cannot open input (EX_NOINPUT)
|
||
// 69 - Service unavailable (EX_UNAVAILABLE)
|
||
// 70 - Internal software error (EX_SOFTWARE)
|
||
// 71 - System error (EX_OSERR)
|
||
// 73 - Can't create output file (EX_CANTCREAT)
|
||
// 74 - Input/output error (EX_IOERR)
|
||
// 75 - Temp failure; retry (EX_TEMPFAIL)
|
||
// 77 - Permission denied (EX_NOPERM)
|
||
// 78 - Configuration error (EX_CONFIG)
|
||
|
||
// DataBuild-specific codes (100+ to avoid collisions): [FUTURE]
|
||
// 100-109 - User-defined permanent failures
|
||
// 110-119 - User-defined transient failures
|
||
// 120-129 - User-defined resource failures
|
||
// 130+ - Other user-defined codes
|
||
|
||
enum ExitCodeCategory { // [FUTURE]
|
||
Success, // 0
|
||
StandardError, // 1-63 (shell/system)
|
||
PosixError, // 64-78 (sysexits.h)
|
||
TransientFailure, // 75 (EX_TEMPFAIL) or 110-119
|
||
UserDefined, // 100+
|
||
}
|
||
```
|
||
|
||
## Platform-Specific Log Handling
|
||
|
||
### Local Execution [DONE]
|
||
- Graph spawns wrapper process [DONE]
|
||
- Graph reads from stdout pipe directly [DONE]
|
||
- PartitionManifest indicates completion [DONE]
|
||
|
||
### Docker [FUTURE]
|
||
- Graph runs `docker run` with wrapper as entrypoint
|
||
- Graph uses `docker logs -f` to tail output
|
||
- Logs persist after container exit
|
||
|
||
### Kubernetes [FUTURE]
|
||
- Job pods use wrapper as container entrypoint
|
||
- Graph tails logs via K8s API
|
||
- Configure `terminationGracePeriodSeconds` for log retention
|
||
|
||
### Cloud Run / Lambda [FUTURE]
|
||
- Wrapper logs to platform logging service
|
||
- Graph queries logs via platform API
|
||
- Natural buffering and persistence
|
||
|
||
## Observability Features
|
||
|
||
### Metrics Collection [FUTURE]
|
||
|
||
For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format:
|
||
|
||
```json
|
||
{
|
||
"timestamp": "2025-01-27T10:30:45Z",
|
||
"content": {
|
||
"metric": {
|
||
"name": "rows_processed",
|
||
"value": 1500000,
|
||
"labels": {
|
||
"partition": "date=2025-01-27",
|
||
"stage": "transform"
|
||
},
|
||
"unit": "count"
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
The graph component will: [FUTURE]
|
||
- Aggregate metrics from job logs
|
||
- Expose them in Prometheus format for scraping (when running as a service)
|
||
- Store summary metrics in the BEL for historical analysis
|
||
|
||
For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).
|
||
|
||
### Heartbeating [DONE]
|
||
|
||
Fixed 30-second heartbeat interval (configurable via `DATABUILD_HEARTBEAT_INTERVAL_MS`):
|
||
|
||
```json
|
||
{
|
||
"timestamp": "2025-01-27T10:30:45Z",
|
||
"content": {
|
||
"JobEvent": {
|
||
"event_type": "heartbeat",
|
||
"metadata": {
|
||
"memory_usage_mb": "1024.256",
|
||
"cpu_usage_percent": "85.200"
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
**Implementation Details:**
|
||
- Uses sysinfo crate for cross-platform process monitoring
|
||
- Heartbeat thread communicates via channels with main thread
|
||
- Includes memory usage (MB) and CPU usage (%) with 3 decimal precision
|
||
- Configurable interval for testing (default 30s, test environments use 100ms)
|
||
- Proper dependency injection via LogSink trait for testability
|
||
|
||
### Log Bandwidth Limits [FUTURE]
|
||
|
||
To prevent log flooding:
|
||
- Maximum log rate: 1000 messages/second
|
||
- Maximum message size: 1MB
|
||
- If limits exceeded: Wrapper emits rate limit warning and drops messages
|
||
- Final metrics show dropped message count
|
||
|
||
## Testing Strategy
|
||
|
||
### Unit Tests [MOSTLY DONE]
|
||
- Log parsing and serialization [DONE - protobuf serde]
|
||
- State machine transitions [DONE - JobLogEntry sequence validation]
|
||
- Heartbeat functionality [DONE - with dependency injection]
|
||
- CPU/memory metrics collection [DONE - with configurable intervals]
|
||
- Exit code categorization [FUTURE]
|
||
- Rate limiting behavior [FUTURE]
|
||
|
||
### Integration Tests [PARTIAL]
|
||
- Full job execution lifecycle [DONE - via e2e tests]
|
||
- Resource metrics validation [DONE - CPU-intensive workload testing]
|
||
- Platform-specific log tailing [PARTIAL - local only]
|
||
- Fast job completion handling [DONE]
|
||
- Large log volume handling [FUTURE]
|
||
|
||
### Platform Tests [PARTIAL]
|
||
- Local process execution [DONE]
|
||
- Docker container runs [FUTURE]
|
||
- Kubernetes job pods [FUTURE]
|
||
- Cloud Run invocations [FUTURE]
|
||
|
||
### Failure Scenario Tests [PARTIAL]
|
||
- Job crashes (SIGSEGV, SIGKILL) [DONE - basic exit code forwarding]
|
||
- Wrapper crashes [FUTURE]
|
||
- Log tailing interruptions [FUTURE]
|
||
- Platform-specific failures [FUTURE]
|
||
|
||
## Implementation Phases
|
||
|
||
### Phase 0: Minimal Bootstrap [DONE]
|
||
Implement the absolute minimum to unblock development and testing:
|
||
- Basic wrapper that only handles happy path [DONE]
|
||
- Support for local execution only [DONE]
|
||
- Minimal log parsing in graph [DONE - wrapper emits structured logs]
|
||
- Integration with existing example jobs [DONE - e2e tests passing]
|
||
|
||
This phase delivers a working end-to-end system that can be continuously evolved. [DONE]
|
||
|
||
**Completed Implementation Details:**
|
||
- Created databuild/job/src/main.rs with config/exec modes [DONE]
|
||
- Uses protobuf types from databuild.proto [DONE]
|
||
- Emits JobLogEntry with sequence numbers [DONE]
|
||
- Follows core-build.md state diagram exactly [DONE]
|
||
- Forwards job stdout/stderr as LogMessage entries [DONE]
|
||
- Emits PartitionManifest on successful completion [DONE]
|
||
- Properly handles job failures with exit codes [DONE]
|
||
- Modified Bazel rules to use job_wrapper [DONE]
|
||
- All e2e tests passing [DONE]
|
||
|
||
### Phase 1: Core Protocol [MOSTLY DONE]
|
||
- Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent]
|
||
- Implement structured logger [DONE - JSON serialization to stdout]
|
||
- Add error handling and exit codes [PARTIAL - basic forwarding only]
|
||
- Implement heartbeating [DONE - with CPU/memory metrics]
|
||
- Resource metrics collection [DONE - CPU time, peak memory, runtime]
|
||
- Dependency injection for testability [DONE - LogSink trait pattern]
|
||
- Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume]
|
||
- MetricPoint message support [FUTURE]
|
||
- Advanced error categorization [FUTURE]
|
||
|
||
### Phase 2: Platform Support [FUTURE]
|
||
- Docker integration [FUTURE]
|
||
- Kubernetes support [FUTURE]
|
||
- Cloud platform adapters [FUTURE]
|
||
- Platform-specific testing [FUTURE]
|
||
|
||
### Phase 3: Production Hardening [FUTURE]
|
||
- Rate limiting [FUTURE]
|
||
- Error recovery [FUTURE]
|
||
- Performance optimization [FUTURE]
|
||
- Monitoring integration [FUTURE]
|
||
|
||
### Phase 4: Advanced Features [FUTURE]
|
||
- In-process config for library jobs [FUTURE]
|
||
- Custom metrics backends [FUTURE]
|
||
- Advanced failure analysis [FUTURE]
|
||
|
||
## Success Criteria
|
||
|
||
1. **Zero Network Dependencies**: Jobs run without any network access [DONE]
|
||
2. **Platform Parity**: Identical behavior across all execution platforms [PARTIAL - local only]
|
||
3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec [DONE - fast execution]
|
||
4. **Complete Observability**: All job state changes captured in logs [DONE - core events captured]
|
||
5. **Graceful Failures**: No log data loss even in crash scenarios [PARTIAL - basic failure handling]
|
||
|
||
## Next Steps
|
||
|
||
1. Implement minimal bootstrap wrapper [DONE]
|
||
2. Test with existing example jobs [DONE]
|
||
3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation]
|
||
4. Gradually add features per implementation phases [IN PROGRESS]
|
||
|
||
**Phase 1 Achievements:**
|
||
- ✅ Heartbeating support with CPU/memory metrics [DONE]
|
||
- ✅ Dependency injection for testability (LogSink trait) [DONE]
|
||
- ✅ Resource metrics collection (CPU time, peak memory, runtime) [DONE]
|
||
- ✅ Comprehensive test coverage for heartbeats and metrics [DONE]
|
||
- ✅ Configurable intervals for different environments [DONE]
|
||
|
||
**Remaining for Phase 1 Completion:**
|
||
- Implement MetricPoint logging [FUTURE]
|
||
- Add graph-side structured log consumption [FUTURE]
|
||
- Enhanced error categorization and exit code mapping [FUTURE]
|
||
|
||
**Recent Implementation Details:**
|
||
- Uses sysinfo 0.30 for cross-platform process monitoring
|
||
- Thread-safe heartbeat communication via mpsc channels
|
||
- Floating-point metrics with 3 decimal precision (f64)
|
||
- Environment variable configuration: `DATABUILD_HEARTBEAT_INTERVAL_MS`, `DATABUILD_METRICS_INTERVAL_MS`
|
||
- Robust test infrastructure with synthetic CPU-intensive workloads
|
||
- Proper CPU time calculation: (average_cpu_percent / 100.0) × wall_clock_time |