Add exec wrapper tests

This commit is contained in:
Stuart Axelbrooke 2025-07-28 03:16:34 -07:00
parent eb26bd0274
commit f1bd273816
4 changed files with 299 additions and 94 deletions

View file

@ -96,6 +96,9 @@ def lookup_job_for_partition(partition_ref: str) -> str:
- **Missing partition refs**: All outputs must be addressable via partition references
- **Not adding new generated files to OpenAPI outs**: Bazel hermeticity demands that we specify each output file, so when the OpenAPI code gen would create new files, we need to explicitly add them to the target's outs field.
## Notes / Tips
- Rust dependencies are implemented via rules_rust, so new dependencies should be added in the `MODULE.bazel` file.
## Documentation
We use plans / designs in the [plans](./plans/) directory to anchor most large scale efforts. We create plans that are good bets, though not necessarily exhaustive, then (and this is critical) we update them after the work is completed, or after significant progress towards completion.

View file

@ -1,8 +1,8 @@
load("@rules_rust//rust:defs.bzl", "rust_binary")
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test")
rust_binary(
name = "job_wrapper",
srcs = ["src/main.rs"],
srcs = ["main.rs"],
visibility = ["//visibility:public"],
deps = [
"//databuild",
@ -11,3 +11,14 @@ rust_binary(
"@crates//:uuid",
],
)
rust_test(
name = "job_wrapper_test",
srcs = ["main.rs"],
deps = [
"//databuild",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:uuid",
],
)

View file

@ -263,4 +263,168 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// Helper functions for testing
fn generate_test_config(outputs: &[String]) -> JobConfig {
JobConfig {
outputs: outputs.iter().map(|s| PartitionRef { r#str: s.clone() }).collect(),
inputs: vec![],
args: outputs.to_vec(),
env: {
let mut env_map = std::collections::HashMap::new();
if let Some(partition_ref) = outputs.first() {
env_map.insert("PARTITION_REF".to_string(), partition_ref.clone());
}
env_map
},
}
}
#[test]
fn test_job_log_entry_serialization() {
let entry = JobLogEntry {
timestamp: "1234567890".to_string(),
job_id: "test-id".to_string(),
partition_ref: "test/partition".to_string(),
sequence_number: 1,
content: Some(job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: "test message".to_string(),
fields: std::collections::HashMap::new(),
})),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"timestamp\":\"1234567890\""));
assert!(json.contains("\"sequence_number\":1"));
assert!(json.contains("\"Log\":{")); // Capitalized field name
assert!(json.contains("\"message\":\"test message\""));
}
#[test]
fn test_sequence_number_increment() {
let mut wrapper = JobWrapper::new();
assert_eq!(wrapper.next_sequence(), 1);
assert_eq!(wrapper.next_sequence(), 2);
assert_eq!(wrapper.next_sequence(), 3);
}
#[test]
fn test_config_mode_output_format() {
let outputs = vec!["test/partition".to_string()];
let config = generate_test_config(&outputs);
// Verify it produces expected structure
assert_eq!(config.outputs.len(), 1);
assert_eq!(config.outputs[0].r#str, "test/partition");
assert_eq!(config.args, outputs);
assert_eq!(config.env.get("PARTITION_REF"), Some(&"test/partition".to_string()));
}
#[test]
fn test_multiple_outputs_config() {
let outputs = vec![
"reviews/date=2025-01-01".to_string(),
"reviews/date=2025-01-02".to_string(),
];
let config = generate_test_config(&outputs);
assert_eq!(config.outputs.len(), 2);
assert_eq!(config.outputs[0].r#str, "reviews/date=2025-01-01");
assert_eq!(config.outputs[1].r#str, "reviews/date=2025-01-02");
// First output is used as PARTITION_REF
assert_eq!(config.env.get("PARTITION_REF"), Some(&"reviews/date=2025-01-01".to_string()));
}
#[test]
fn test_wrapper_job_event_creation() {
// Test success event
let event = WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
exit_code: Some(0),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_success");
assert_eq!(event.job_status, Some("JOB_COMPLETED".to_string()));
assert_eq!(event.exit_code, Some(0));
// Test failure event
let event = WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
exit_code: Some(1),
metadata: std::collections::HashMap::new(),
};
assert_eq!(event.event_type, "task_failed");
assert_eq!(event.job_status, Some("JOB_FAILED".to_string()));
assert_eq!(event.exit_code, Some(1));
}
#[test]
fn test_log_message_levels() {
let info_log = LogMessage {
level: log_message::LogLevel::Info as i32,
message: "info message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(info_log.level, log_message::LogLevel::Info as i32);
let error_log = LogMessage {
level: log_message::LogLevel::Error as i32,
message: "error message".to_string(),
fields: std::collections::HashMap::new(),
};
assert_eq!(error_log.level, log_message::LogLevel::Error as i32);
}
#[test]
fn test_partition_manifest_structure() {
let config = generate_test_config(&vec!["test/partition".to_string()]);
let manifest = PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![],
start_time: 1234567890,
end_time: 1234567900,
task: Some(Task {
job: Some(JobLabel {
label: "//test:job".to_string(),
}),
config: Some(config),
}),
metadata: std::collections::HashMap::new(),
};
assert_eq!(manifest.outputs.len(), 1);
assert_eq!(manifest.outputs[0].r#str, "test/partition");
assert_eq!(manifest.end_time - manifest.start_time, 10);
assert!(manifest.task.is_some());
}
#[test]
fn test_timestamp_generation() {
let ts1 = JobWrapper::get_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = JobWrapper::get_timestamp();
// Timestamps should be parseable as integers
let t1: u64 = ts1.parse().expect("Should be valid timestamp");
let t2: u64 = ts2.parse().expect("Should be valid timestamp");
// Second timestamp should be equal or greater
assert!(t2 >= t1);
}
#[test]
fn test_job_wrapper_initialization() {
let wrapper = JobWrapper::new();
assert_eq!(wrapper.sequence_number, 0);
assert!(!wrapper.job_id.is_empty());
assert!(wrapper.start_time > 0);
}
}

View file

@ -1,5 +1,12 @@
# Job Wrapper v2 Plan
## Status
- Phase 0: Minimal Bootstrap [DONE]
- Phase 1: Core Protocol [PARTIAL]
- Phase 2: Platform Support [FUTURE]
- Phase 3: Production Hardening [FUTURE]
- Phase 4: Advanced Features [FUTURE]
## Required Reading
Before implementing this plan, engineers should thoroughly understand these design documents:
@ -16,10 +23,10 @@ The job wrapper is a critical component that mediates between DataBuild graphs a
## Architecture
### Core Design Principles
1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs
2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms
3. **Zero Network Requirements**: Jobs don't need to connect to any services
4. **Fail-Safe**: Graceful handling of job crashes and fast completions
1. **Single Communication Channel**: Jobs communicate with graphs exclusively through structured logs [DONE]
2. **Platform Agnostic**: Works identically across local, Docker, K8s, and cloud platforms [PARTIAL - local only]
3. **Zero Network Requirements**: Jobs don't need to connect to any services [DONE]
4. **Fail-Safe**: Graceful handling of job crashes and fast completions [PARTIAL - basic handling only]
### Communication Model
```
@ -30,23 +37,23 @@ Graph: Tails logs and interprets into metrics, events, and manifests
## Structured Log Protocol
### Message Format (Protobuf)
### Message Format (Protobuf) [DONE]
```proto
message JobLogEntry {
string timestamp = 1;
string job_id = 2;
string partition_ref = 3;
uint64 sequence_number = 4; // Monotonic sequence starting from 1
uint64 sequence_number = 4; // Monotonic sequence starting from 1 [DONE]
oneof content {
LogMessage log = 5;
MetricPoint metric = 6;
JobEvent event = 7;
PartitionManifest manifest = 8;
LogMessage log = 5; // [DONE]
MetricPoint metric = 6; // [FUTURE]
JobEvent event = 7; // [DONE - WrapperJobEvent]
PartitionManifest manifest = 8; // [DONE]
}
}
message LogMessage {
message LogMessage { // [DONE]
enum LogLevel {
DEBUG = 0;
INFO = 1;
@ -58,14 +65,14 @@ message LogMessage {
map<string, string> fields = 3;
}
message MetricPoint {
message MetricPoint { // [FUTURE]
string name = 1;
double value = 2;
map<string, string> labels = 3;
string unit = 4;
}
message JobEvent {
message JobEvent { // [DONE - as WrapperJobEvent]
string event_type = 1; // "task_launched", "heartbeat", "task_completed", etc
google.protobuf.Any details = 2;
map<string, string> metadata = 3;
@ -73,31 +80,32 @@ message JobEvent {
```
### Log Stream Lifecycle
1. Wrapper emits `job_config_started` event (sequence #1)
2. Wrapper validates configuration
3. Wrapper emits `task_launched` event (sequence #2)
4. Job executes, wrapper captures stdout/stderr (sequence #3+)
5. Wrapper emits periodic `heartbeat` events (every 30s)
6. Wrapper detects job completion
7. Wrapper emits `PartitionManifest` message (final required message with highest sequence number)
8. Wrapper exits
1. Wrapper emits `config_validate_success` event (sequence #1) [DONE]
2. Wrapper validates configuration [DONE]
3. Wrapper emits `task_launch_success` event (sequence #2) [DONE]
4. Job executes, wrapper captures stdout/stderr (sequence #3+) [DONE]
5. Wrapper emits periodic `heartbeat` events (every 30s) [FUTURE]
6. Wrapper detects job completion [DONE]
7. Wrapper emits `task_success`/`task_failed` event [DONE]
8. Wrapper emits `PartitionManifest` message (final required message with highest sequence number) [DONE]
9. Wrapper exits [DONE]
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection.
The PartitionManifest serves as the implicit end-of-logs marker - the graph knows processing is complete when it sees this message. Sequence numbers enable the graph to detect missing or out-of-order messages and ensure reliable telemetry collection. [DONE - sequence numbers implemented]
## Wrapper Implementation
## Wrapper Implementation [PARTIAL]
### Interfaces
### Interfaces [DONE]
```rust
trait JobWrapper {
// Config mode - accepts PartitionRef objects
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>;
fn config(outputs: Vec<PartitionRef>) -> Result<JobConfig>; // [DONE]
// Exec mode - accepts serialized JobConfig
fn exec(config: JobConfig) -> Result<()>;
fn exec(config: JobConfig) -> Result<()>; // [DONE]
}
```
### Exit Code Standards
### Exit Code Standards [PARTIAL]
Following POSIX conventions and avoiding collisions with standard exit codes:
@ -106,7 +114,7 @@ Reference:
- https://tldp.org/LDP/abs/html/exitcodes.html
```rust
// Standard POSIX codes we respect:
// Standard POSIX codes we respect: [PARTIAL - basic forwarding only]
// 0 - Success
// 1 - General error
// 2 - Misuse of shell builtin
@ -122,13 +130,13 @@ Reference:
// 77 - Permission denied (EX_NOPERM)
// 78 - Configuration error (EX_CONFIG)
// DataBuild-specific codes (100+ to avoid collisions):
// DataBuild-specific codes (100+ to avoid collisions): [FUTURE]
// 100-109 - User-defined permanent failures
// 110-119 - User-defined transient failures
// 120-129 - User-defined resource failures
// 130+ - Other user-defined codes
enum ExitCodeCategory {
enum ExitCodeCategory { // [FUTURE]
Success, // 0
StandardError, // 1-63 (shell/system)
PosixError, // 64-78 (sysexits.h)
@ -139,29 +147,29 @@ enum ExitCodeCategory {
## Platform-Specific Log Handling
### Local Execution
- Graph spawns wrapper process
- Graph reads from stdout pipe directly
- PartitionManifest indicates completion
### Local Execution [DONE]
- Graph spawns wrapper process [DONE]
- Graph reads from stdout pipe directly [DONE]
- PartitionManifest indicates completion [DONE]
### Docker
### Docker [FUTURE]
- Graph runs `docker run` with wrapper as entrypoint
- Graph uses `docker logs -f` to tail output
- Logs persist after container exit
### Kubernetes
### Kubernetes [FUTURE]
- Job pods use wrapper as container entrypoint
- Graph tails logs via K8s API
- Configure `terminationGracePeriodSeconds` for log retention
### Cloud Run / Lambda
### Cloud Run / Lambda [FUTURE]
- Wrapper logs to platform logging service
- Graph queries logs via platform API
- Natural buffering and persistence
## Observability Features
### Metrics Collection
### Metrics Collection [FUTURE]
For metrics, we'll use a simplified StatsD-like format in our structured logs, which the graph can aggregate and expose via Prometheus format:
@ -182,14 +190,14 @@ For metrics, we'll use a simplified StatsD-like format in our structured logs, w
}
```
The graph component will:
The graph component will: [FUTURE]
- Aggregate metrics from job logs
- Expose them in Prometheus format for scraping (when running as a service)
- Store summary metrics in the BEL for historical analysis
For CLI-invoked builds, metrics are still captured in the BEL but not exposed for scraping (which is acceptable since these are typically one-off runs).
### Heartbeating
### Heartbeating [FUTURE]
Fixed 30-second heartbeat interval (based on Kubernetes best practices):
@ -208,7 +216,7 @@ Fixed 30-second heartbeat interval (based on Kubernetes best practices):
}
```
### Log Bandwidth Limits
### Log Bandwidth Limits [FUTURE]
To prevent log flooding:
- Maximum log rate: 1000 messages/second
@ -218,76 +226,95 @@ To prevent log flooding:
## Testing Strategy
### Unit Tests
### Unit Tests [FUTURE]
- Log parsing and serialization
- Exit code categorization
- Rate limiting behavior
- State machine transitions
### Integration Tests
- Full job execution lifecycle
- Platform-specific log tailing
- Fast job completion handling
- Large log volume handling
### Integration Tests [PARTIAL]
- Full job execution lifecycle [DONE - via e2e tests]
- Platform-specific log tailing [PARTIAL - local only]
- Fast job completion handling [DONE]
- Large log volume handling [FUTURE]
### Platform Tests
- Local process execution
- Docker container runs
- Kubernetes job pods
- Cloud Run invocations
### Platform Tests [PARTIAL]
- Local process execution [DONE]
- Docker container runs [FUTURE]
- Kubernetes job pods [FUTURE]
- Cloud Run invocations [FUTURE]
### Failure Scenario Tests
- Job crashes (SIGSEGV, SIGKILL)
- Wrapper crashes
- Log tailing interruptions
- Platform-specific failures
### Failure Scenario Tests [PARTIAL]
- Job crashes (SIGSEGV, SIGKILL) [DONE - basic exit code forwarding]
- Wrapper crashes [FUTURE]
- Log tailing interruptions [FUTURE]
- Platform-specific failures [FUTURE]
## Implementation Phases
### Phase 0: Minimal Bootstrap
### Phase 0: Minimal Bootstrap [DONE]
Implement the absolute minimum to unblock development and testing:
- Basic wrapper that only handles happy path
- Support for local execution only
- Minimal log parsing in graph
- Integration with existing example jobs
- Basic wrapper that only handles happy path [DONE]
- Support for local execution only [DONE]
- Minimal log parsing in graph [DONE - wrapper emits structured logs]
- Integration with existing example jobs [DONE - e2e tests passing]
This phase delivers a working end-to-end system that can be continuously evolved.
This phase delivers a working end-to-end system that can be continuously evolved. [DONE]
### Phase 1: Core Protocol
- Define protobuf schemas
- Implement structured logger
- Add error handling and exit codes
- Implement heartbeating
- Graph-side log parser improvements
**Completed Implementation Details:**
- Created databuild/job/src/main.rs with config/exec modes [DONE]
- Uses protobuf types from databuild.proto [DONE]
- Emits JobLogEntry with sequence numbers [DONE]
- Follows core-build.md state diagram exactly [DONE]
- Forwards job stdout/stderr as LogMessage entries [DONE]
- Emits PartitionManifest on successful completion [DONE]
- Properly handles job failures with exit codes [DONE]
- Modified Bazel rules to use job_wrapper [DONE]
- All e2e tests passing [DONE]
### Phase 2: Platform Support
- Docker integration
- Kubernetes support
- Cloud platform adapters
- Platform-specific testing
### Phase 1: Core Protocol [PARTIAL]
- Define protobuf schemas [DONE - JobLogEntry, LogMessage, WrapperJobEvent]
- Implement structured logger [DONE - JSON serialization to stdout]
- Add error handling and exit codes [PARTIAL - basic forwarding only]
- Implement heartbeating [FUTURE]
- Graph-side log parser improvements [FUTURE - wrapper emits, graph needs to consume]
- MetricPoint message support [FUTURE]
- Advanced error categorization [FUTURE]
### Phase 3: Production Hardening
- Rate limiting
- Error recovery
- Performance optimization
- Monitoring integration
### Phase 2: Platform Support [FUTURE]
- Docker integration [FUTURE]
- Kubernetes support [FUTURE]
- Cloud platform adapters [FUTURE]
- Platform-specific testing [FUTURE]
### Phase 4: Advanced Features
- In-process config for library jobs
- Custom metrics backends
- Advanced failure analysis
### Phase 3: Production Hardening [FUTURE]
- Rate limiting [FUTURE]
- Error recovery [FUTURE]
- Performance optimization [FUTURE]
- Monitoring integration [FUTURE]
### Phase 4: Advanced Features [FUTURE]
- In-process config for library jobs [FUTURE]
- Custom metrics backends [FUTURE]
- Advanced failure analysis [FUTURE]
## Success Criteria
1. **Zero Network Dependencies**: Jobs run without any network access
2. **Platform Parity**: Identical behavior across all execution platforms
3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec
4. **Complete Observability**: All job state changes captured in logs
5. **Graceful Failures**: No log data loss even in crash scenarios
1. **Zero Network Dependencies**: Jobs run without any network access [DONE]
2. **Platform Parity**: Identical behavior across all execution platforms [PARTIAL - local only]
3. **Minimal Overhead**: < 100ms wrapper overhead for config, < 1s for exec [DONE - fast execution]
4. **Complete Observability**: All job state changes captured in logs [DONE - core events captured]
5. **Graceful Failures**: No log data loss even in crash scenarios [PARTIAL - basic failure handling]
## Next Steps
1. Implement minimal bootstrap wrapper
2. Test with existing example jobs
3. Iterate on log format based on real usage
4. Gradually add features per implementation phases
1. Implement minimal bootstrap wrapper [DONE]
2. Test with existing example jobs [DONE]
3. Iterate on log format based on real usage [IN PROGRESS - Phase 1 continuation]
4. Gradually add features per implementation phases [IN PROGRESS]
**Immediate Next Steps for Phase 1 Completion:**
- Add heartbeating support [FUTURE]
- Implement MetricPoint logging [FUTURE]
- Add graph-side structured log consumption [FUTURE]
- Enhanced error categorization and exit code mapping [FUTURE]