Fix partition refs in logs

This commit is contained in:
Stuart Axelbrooke 2025-07-28 21:11:58 -07:00
parent cccfbd1133
commit d9869123af
2 changed files with 41 additions and 27 deletions

View file

@ -284,7 +284,7 @@ message BuildEvent {
message JobLogEntry {
string timestamp = 1; // Unix timestamp
string job_id = 2; // UUID for this job execution
string partition_ref = 3; // Primary partition being processed
repeated PartitionRef outputs = 3; // Partitions being processed by this job
uint64 sequence_number = 4; // Monotonic sequence starting from 1
oneof content {
@ -589,6 +589,24 @@ message TaskTimelineEvent {
optional string cancel_reason = 6;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Job Log Access (Unified CLI/Service Interface)
///////////////////////////////////////////////////////////////////////////////////////////////
// Request for retrieving job logs
message JobLogsRequest {
string job_run_id = 1; // UUID of the job run
int64 since_timestamp = 2; // Unix timestamp (nanoseconds) - only logs after this time
int32 min_level = 3; // Minimum LogLevel enum value (0=DEBUG, 1=INFO, 2=WARN, 3=ERROR)
uint32 limit = 4; // Maximum number of entries to return
}
// Response containing job log entries
message JobLogsResponse {
repeated JobLogEntry entries = 1; // Log entries matching the request criteria
bool has_more = 2; // True if more entries exist beyond the limit
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Services
///////////////////////////////////////////////////////////////////////////////////////////////

View file

@ -79,11 +79,11 @@ impl<S: LogSink> JobWrapper<S> {
self.sequence_number
}
fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) {
fn emit_log(&mut self, outputs: &[PartitionRef], content: job_log_entry::Content) {
let entry = JobLogEntry {
timestamp: get_timestamp(),
job_id: self.job_id.clone(),
partition_ref: partition_ref.to_string(),
outputs: outputs.to_vec(),
sequence_number: self.next_sequence(),
content: Some(content),
};
@ -92,12 +92,15 @@ impl<S: LogSink> JobWrapper<S> {
}
fn config_mode(&mut self, outputs: Vec<String>) -> Result<(), Box<dyn std::error::Error>> {
// Parse the partition ref from args (first argument)
let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone();
// Convert to PartitionRef objects
let output_refs: Vec<PartitionRef> = outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect();
// Following the state diagram: wrapper_validate_config -> emit_config_validate_success
self.emit_log(
&partition_ref,
&output_refs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
metadata: std::collections::HashMap::new(),
@ -109,10 +112,7 @@ impl<S: LogSink> JobWrapper<S> {
// For Phase 0, we still need to produce the expected JSON config format
// so the current graph system can parse it. Later phases will change this.
let config = JobConfig {
outputs: outputs
.iter()
.map(|s| PartitionRef { r#str: s.clone() })
.collect(),
outputs: output_refs.clone(),
inputs: vec![],
args: outputs.clone(),
env: {
@ -149,16 +149,12 @@ impl<S: LogSink> JobWrapper<S> {
job_binary: &str,
config: JobConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let partition_ref = config
.outputs
.first()
.map(|p| p.str.clone())
.unwrap_or_else(|| "unknown".to_string());
let outputs = &config.outputs;
// Following the state diagram:
// 1. wrapper_validate_config -> emit_config_validate_success
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "config_validate_success".to_string(),
job_status: None,
@ -169,7 +165,7 @@ impl<S: LogSink> JobWrapper<S> {
// 2. wrapper_launch_task -> emit_task_launch_success
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_launch_success".to_string(),
job_status: None,
@ -206,7 +202,7 @@ impl<S: LogSink> JobWrapper<S> {
// Start heartbeat thread with channel communication
let heartbeat_job_id = self.job_id.clone();
let heartbeat_partition_ref = partition_ref.clone();
let heartbeat_outputs = outputs.clone();
let heartbeat_sequence = Arc::new(Mutex::new(0u64));
let heartbeat_sequence_clone = heartbeat_sequence.clone();
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<HeartbeatMessage>();
@ -249,7 +245,7 @@ impl<S: LogSink> JobWrapper<S> {
let heartbeat_event = JobLogEntry {
timestamp: get_timestamp(),
job_id: heartbeat_job_id.clone(),
partition_ref: heartbeat_partition_ref.clone(),
outputs: heartbeat_outputs.clone(),
sequence_number: seq,
content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "heartbeat".to_string(),
@ -378,7 +374,7 @@ impl<S: LogSink> JobWrapper<S> {
if !output.stdout.is_empty() {
let stdout_str = String::from_utf8_lossy(&output.stdout);
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,
message: stdout_str.to_string(),
@ -390,7 +386,7 @@ impl<S: LogSink> JobWrapper<S> {
if !output.stderr.is_empty() {
let stderr_str = String::from_utf8_lossy(&output.stderr);
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Error as i32,
message: stderr_str.to_string(),
@ -413,7 +409,7 @@ impl<S: LogSink> JobWrapper<S> {
summary_metadata.insert("exit_code".to_string(), exit_code.to_string());
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_summary".to_string(),
job_status: None,
@ -425,7 +421,7 @@ impl<S: LogSink> JobWrapper<S> {
if success {
// Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_success".to_string(),
job_status: Some("JOB_COMPLETED".to_string()),
@ -441,7 +437,7 @@ impl<S: LogSink> JobWrapper<S> {
.as_secs() as i64;
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::Manifest(PartitionManifest {
outputs: config.outputs.clone(),
inputs: vec![], // Phase 0: no input manifests yet
@ -460,7 +456,7 @@ impl<S: LogSink> JobWrapper<S> {
} else {
// Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "task_failed".to_string(),
job_status: Some("JOB_FAILED".to_string()),
@ -471,7 +467,7 @@ impl<S: LogSink> JobWrapper<S> {
// Then emit_job_exec_fail -> fail (don't emit partition manifest on failure)
self.emit_log(
&partition_ref,
outputs,
job_log_entry::Content::JobEvent(WrapperJobEvent {
event_type: "job_exec_fail".to_string(),
job_status: Some("JOB_FAILED".to_string()),
@ -590,7 +586,7 @@ mod tests {
let entry = JobLogEntry {
timestamp: "1234567890".to_string(),
job_id: "test-id".to_string(),
partition_ref: "test/partition".to_string(),
outputs: vec![PartitionRef { r#str: "test/partition".to_string() }],
sequence_number: 1,
content: Some(job_log_entry::Content::Log(LogMessage {
level: log_message::LogLevel::Info as i32,