diff --git a/databuild/databuild.proto b/databuild/databuild.proto index d3adee0..36a47ee 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -284,7 +284,7 @@ message BuildEvent { message JobLogEntry { string timestamp = 1; // Unix timestamp string job_id = 2; // UUID for this job execution - string partition_ref = 3; // Primary partition being processed + repeated PartitionRef outputs = 3; // Partitions being processed by this job uint64 sequence_number = 4; // Monotonic sequence starting from 1 oneof content { @@ -589,6 +589,24 @@ message TaskTimelineEvent { optional string cancel_reason = 6; } +/////////////////////////////////////////////////////////////////////////////////////////////// +// Job Log Access (Unified CLI/Service Interface) +/////////////////////////////////////////////////////////////////////////////////////////////// + +// Request for retrieving job logs +message JobLogsRequest { + string job_run_id = 1; // UUID of the job run + int64 since_timestamp = 2; // Unix timestamp (nanoseconds) - only logs after this time + int32 min_level = 3; // Minimum LogLevel enum value (0=DEBUG, 1=INFO, 2=WARN, 3=ERROR) + uint32 limit = 4; // Maximum number of entries to return +} + +// Response containing job log entries +message JobLogsResponse { + repeated JobLogEntry entries = 1; // Log entries matching the request criteria + bool has_more = 2; // True if more entries exist beyond the limit +} + /////////////////////////////////////////////////////////////////////////////////////////////// // Services /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/databuild/job/main.rs b/databuild/job/main.rs index fe17f86..fe74451 100644 --- a/databuild/job/main.rs +++ b/databuild/job/main.rs @@ -79,11 +79,11 @@ impl JobWrapper { self.sequence_number } - fn emit_log(&mut self, partition_ref: &str, content: job_log_entry::Content) { + fn emit_log(&mut self, outputs: &[PartitionRef], content: job_log_entry::Content) { let entry = JobLogEntry { timestamp: get_timestamp(), job_id: self.job_id.clone(), - partition_ref: partition_ref.to_string(), + outputs: outputs.to_vec(), sequence_number: self.next_sequence(), content: Some(content), }; @@ -92,12 +92,15 @@ impl JobWrapper { } fn config_mode(&mut self, outputs: Vec) -> Result<(), Box> { - // Parse the partition ref from args (first argument) - let partition_ref = outputs.first().unwrap_or(&"unknown".to_string()).clone(); + // Convert to PartitionRef objects + let output_refs: Vec = outputs + .iter() + .map(|s| PartitionRef { r#str: s.clone() }) + .collect(); // Following the state diagram: wrapper_validate_config -> emit_config_validate_success self.emit_log( - &partition_ref, + &output_refs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "config_validate_success".to_string(), metadata: std::collections::HashMap::new(), @@ -109,10 +112,7 @@ impl JobWrapper { // For Phase 0, we still need to produce the expected JSON config format // so the current graph system can parse it. Later phases will change this. let config = JobConfig { - outputs: outputs - .iter() - .map(|s| PartitionRef { r#str: s.clone() }) - .collect(), + outputs: output_refs.clone(), inputs: vec![], args: outputs.clone(), env: { @@ -149,16 +149,12 @@ impl JobWrapper { job_binary: &str, config: JobConfig, ) -> Result<(), Box> { - let partition_ref = config - .outputs - .first() - .map(|p| p.str.clone()) - .unwrap_or_else(|| "unknown".to_string()); + let outputs = &config.outputs; // Following the state diagram: // 1. wrapper_validate_config -> emit_config_validate_success self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "config_validate_success".to_string(), job_status: None, @@ -169,7 +165,7 @@ impl JobWrapper { // 2. wrapper_launch_task -> emit_task_launch_success self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "task_launch_success".to_string(), job_status: None, @@ -206,7 +202,7 @@ impl JobWrapper { // Start heartbeat thread with channel communication let heartbeat_job_id = self.job_id.clone(); - let heartbeat_partition_ref = partition_ref.clone(); + let heartbeat_outputs = outputs.clone(); let heartbeat_sequence = Arc::new(Mutex::new(0u64)); let heartbeat_sequence_clone = heartbeat_sequence.clone(); let (heartbeat_tx, heartbeat_rx) = mpsc::channel::(); @@ -249,7 +245,7 @@ impl JobWrapper { let heartbeat_event = JobLogEntry { timestamp: get_timestamp(), job_id: heartbeat_job_id.clone(), - partition_ref: heartbeat_partition_ref.clone(), + outputs: heartbeat_outputs.clone(), sequence_number: seq, content: Some(job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "heartbeat".to_string(), @@ -378,7 +374,7 @@ impl JobWrapper { if !output.stdout.is_empty() { let stdout_str = String::from_utf8_lossy(&output.stdout); self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::Log(LogMessage { level: log_message::LogLevel::Info as i32, message: stdout_str.to_string(), @@ -390,7 +386,7 @@ impl JobWrapper { if !output.stderr.is_empty() { let stderr_str = String::from_utf8_lossy(&output.stderr); self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::Log(LogMessage { level: log_message::LogLevel::Error as i32, message: stderr_str.to_string(), @@ -413,7 +409,7 @@ impl JobWrapper { summary_metadata.insert("exit_code".to_string(), exit_code.to_string()); self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "job_summary".to_string(), job_status: None, @@ -425,7 +421,7 @@ impl JobWrapper { if success { // Following the state diagram: wrapper_monitor_task -> zero exit -> emit_task_success self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "task_success".to_string(), job_status: Some("JOB_COMPLETED".to_string()), @@ -441,7 +437,7 @@ impl JobWrapper { .as_secs() as i64; self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::Manifest(PartitionManifest { outputs: config.outputs.clone(), inputs: vec![], // Phase 0: no input manifests yet @@ -460,7 +456,7 @@ impl JobWrapper { } else { // Following the state diagram: wrapper_monitor_task -> non-zero exit -> emit_task_failed self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "task_failed".to_string(), job_status: Some("JOB_FAILED".to_string()), @@ -471,7 +467,7 @@ impl JobWrapper { // Then emit_job_exec_fail -> fail (don't emit partition manifest on failure) self.emit_log( - &partition_ref, + outputs, job_log_entry::Content::JobEvent(WrapperJobEvent { event_type: "job_exec_fail".to_string(), job_status: Some("JOB_FAILED".to_string()), @@ -590,7 +586,7 @@ mod tests { let entry = JobLogEntry { timestamp: "1234567890".to_string(), job_id: "test-id".to_string(), - partition_ref: "test/partition".to_string(), + outputs: vec![PartitionRef { r#str: "test/partition".to_string() }], sequence_number: 1, content: Some(job_log_entry::Content::Log(LogMessage { level: log_message::LogLevel::Info as i32,