diff --git a/databuild/databuild.proto b/databuild/databuild.proto index bbe464e..98cf258 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -22,8 +22,9 @@ enum DepType { // Represents a data dependency message DataDep { - DepType dep_type = 1; - PartitionRef partition_ref = 2; + DepType dep_type_code = 1; // Enum for programmatic use + string dep_type_name = 2; // Human-readable string ("query", "materialize") + PartitionRef partition_ref = 3; // Moved from field 2 to 3 } // Configuration for a job @@ -389,6 +390,141 @@ message BuildSummary { bool cancelled = 13; } +/////////////////////////////////////////////////////////////////////////////////////////////// +// Detail Operations (Unified CLI/Service Detail Responses) +/////////////////////////////////////////////////////////////////////////////////////////////// + +// +// Build Detail +// + +message BuildDetailRequest { + string build_request_id = 1; +} + +message BuildDetailResponse { + string build_request_id = 1; + BuildRequestStatus status_code = 2; // Enum for programmatic use + string status_name = 3; // Human-readable string + repeated PartitionRef requested_partitions = 4; + uint32 total_jobs = 5; + uint32 completed_jobs = 6; + uint32 failed_jobs = 7; + uint32 cancelled_jobs = 8; + int64 requested_at = 9; + optional int64 started_at = 10; + optional int64 completed_at = 11; + optional int64 duration_ms = 12; + bool cancelled = 13; + optional string cancel_reason = 14; + repeated BuildTimelineEvent timeline = 15; +} + +message BuildTimelineEvent { + int64 timestamp = 1; + optional BuildRequestStatus status_code = 2; // Enum for programmatic use + optional string status_name = 3; // Human-readable string + string message = 4; + string event_type = 5; + optional string cancel_reason = 6; +} + +// +// Partition Detail +// + +message PartitionDetailRequest { + string partition_ref = 1; +} + +message PartitionDetailResponse { + string partition_ref = 1; + PartitionStatus status_code = 2; // Enum for programmatic use + string status_name = 3; // Human-readable string + int64 last_updated = 4; + uint32 builds_count = 5; + optional string last_successful_build = 6; + uint32 invalidation_count = 7; + repeated PartitionTimelineEvent timeline = 8; +} + +message PartitionTimelineEvent { + int64 timestamp = 1; + PartitionStatus status_code = 2; // Enum for programmatic use + string status_name = 3; // Human-readable string + string message = 4; + string build_request_id = 5; + optional string job_run_id = 6; +} + +// +// Job Detail +// + +message JobDetailRequest { + string job_label = 1; +} + +message JobDetailResponse { + string job_label = 1; + uint32 total_runs = 2; + uint32 successful_runs = 3; + uint32 failed_runs = 4; + uint32 cancelled_runs = 5; + double average_partitions_per_run = 6; + int64 last_run_timestamp = 7; + JobStatus last_run_status_code = 8; // Enum for programmatic use + string last_run_status_name = 9; // Human-readable string + repeated string recent_builds = 10; + repeated JobRunDetail runs = 11; +} + +message JobRunDetail { + string job_run_id = 1; + string build_request_id = 2; + repeated PartitionRef target_partitions = 3; + JobStatus status_code = 4; // Enum for programmatic use + string status_name = 5; // Human-readable string + optional int64 started_at = 6; + optional int64 completed_at = 7; + optional int64 duration_ms = 8; + string message = 9; +} + +// +// Task Detail +// + +message TaskDetailRequest { + string job_run_id = 1; +} + +message TaskDetailResponse { + string job_run_id = 1; + string job_label = 2; + string build_request_id = 3; + JobStatus status_code = 4; // Enum for programmatic use + string status_name = 5; // Human-readable string + repeated PartitionRef target_partitions = 6; + int64 scheduled_at = 7; + optional int64 started_at = 8; + optional int64 completed_at = 9; + optional int64 duration_ms = 10; + bool cancelled = 11; + optional string cancel_reason = 12; + string message = 13; + repeated TaskTimelineEvent timeline = 14; +} + +message TaskTimelineEvent { + int64 timestamp = 1; + optional JobStatus status_code = 2; // Enum for programmatic use + optional string status_name = 3; // Human-readable string + string message = 4; + string event_type = 5; + optional string cancel_reason = 6; +} + /////////////////////////////////////////////////////////////////////////////////////////////// // Services /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/databuild/graph/analyze.rs b/databuild/graph/analyze.rs index 9fdd8a9..2f0b534 100644 --- a/databuild/graph/analyze.rs +++ b/databuild/graph/analyze.rs @@ -309,7 +309,7 @@ async fn plan( let mut new_unhandled_count = 0; for task in &new_nodes { for input in &task.config.as_ref().unwrap().inputs { - if input.dep_type == 1 { // MATERIALIZE = 1 + if input.dep_type_code == 1 { // MATERIALIZE = 1 if !unhandled_refs.contains(&input.partition_ref.as_ref().unwrap().str) { new_unhandled_count += 1; } diff --git a/databuild/graph/execute.rs b/databuild/graph/execute.rs index c023f47..cad09ab 100644 --- a/databuild/graph/execute.rs +++ b/databuild/graph/execute.rs @@ -248,7 +248,7 @@ fn is_task_ready(task: &Task, completed_outputs: &HashSet) -> bool { let mut missing_deps = Vec::new(); for dep in &task.config.as_ref().unwrap().inputs { - if dep.dep_type == 1 { // MATERIALIZE = 1 + if dep.dep_type_code == 1 { // MATERIALIZE = 1 if !completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str) { missing_deps.push(&dep.partition_ref.as_ref().unwrap().str); } @@ -697,7 +697,7 @@ async fn main() -> Result<(), Box> { warn!("Pending task: {} ({})", task.job.as_ref().unwrap().label, key); warn!(" Required inputs:"); for dep in &task.config.as_ref().unwrap().inputs { - if dep.dep_type == 1 { // MATERIALIZE = 1 + if dep.dep_type_code == 1 { // MATERIALIZE = 1 let available = completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str); warn!(" {} - {}", dep.partition_ref.as_ref().unwrap().str, if available { "AVAILABLE" } else { "MISSING" }); } diff --git a/databuild/mermaid_utils.rs b/databuild/mermaid_utils.rs index 393d4bf..59a5e93 100644 --- a/databuild/mermaid_utils.rs +++ b/databuild/mermaid_utils.rs @@ -543,7 +543,7 @@ pub fn generate_mermaid_with_status( for input in &config.inputs { if let Some(partition_ref) = &input.partition_ref { let ref_id = builder.add_partition_node(&partition_ref.str); - let edge_type = if input.dep_type == 1 { + let edge_type = if input.dep_type_code == 1 { EdgeType::Solid } else { EdgeType::Dotted @@ -661,7 +661,8 @@ mod tests { inputs: vec![{ let mut input = DataDep::default(); input.partition_ref = Some(PartitionRef { str: "input/data".to_string() }); - input.dep_type = 1; // Solid dependency + input.dep_type_code = 1; // Solid dependency + input.dep_type_name = "materialize".to_string(); input }], outputs: vec![ @@ -678,7 +679,8 @@ mod tests { inputs: vec![{ let mut input = DataDep::default(); input.partition_ref = Some(PartitionRef { str: "intermediate/data".to_string() }); - input.dep_type = 2; // Dotted dependency + input.dep_type_code = 0; // Dotted dependency + input.dep_type_name = "query".to_string(); input }], outputs: vec![ @@ -789,7 +791,8 @@ mod tests { inputs: vec![{ let mut input = DataDep::default(); input.partition_ref = Some(PartitionRef { str: "input/data".to_string() }); - input.dep_type = 1; // Solid dependency + input.dep_type_code = 1; // Solid dependency + input.dep_type_name = "materialize".to_string(); input }], outputs: vec![ @@ -859,7 +862,8 @@ mod tests { inputs: vec![{ let mut input = DataDep::default(); input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() }); - input.dep_type = 1; + input.dep_type_code = 1; + input.dep_type_name = "materialize".to_string(); input }], outputs: vec![ @@ -875,7 +879,8 @@ mod tests { inputs: vec![{ let mut input = DataDep::default(); input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() }); - input.dep_type = 1; + input.dep_type_code = 1; + input.dep_type_name = "materialize".to_string(); input }], outputs: vec![ diff --git a/databuild/repositories/builds/mod.rs b/databuild/repositories/builds/mod.rs index dee0595..a9c71ba 100644 --- a/databuild/repositories/builds/mod.rs +++ b/databuild/repositories/builds/mod.rs @@ -276,6 +276,49 @@ impl BuildsRepository { Ok(build_info.map(|info| (info, timeline))) } + /// Show detailed information about a specific build using protobuf response format + /// + /// Returns the complete build details with dual status fields and timeline events. + pub async fn show_protobuf(&self, build_request_id: &str) -> Result> { + // Get build info and timeline using existing show method + if let Some((build_info, timeline)) = self.show(build_request_id).await? { + // Convert timeline events to protobuf format + let protobuf_timeline: Vec = timeline + .into_iter() + .map(|event| BuildTimelineEvent { + timestamp: event.timestamp, + status_code: event.status.map(|s| s as i32), + status_name: event.status.map(|s| s.to_display_string()), + message: event.message, + event_type: event.event_type, + cancel_reason: event.cancel_reason, + }) + .collect(); + + let response = BuildDetailResponse { + build_request_id: build_info.build_request_id, + status_code: build_info.status as i32, + status_name: build_info.status.to_display_string(), + requested_partitions: build_info.requested_partitions, + total_jobs: build_info.total_jobs as u32, + completed_jobs: build_info.completed_jobs as u32, + failed_jobs: build_info.failed_jobs as u32, + cancelled_jobs: build_info.cancelled_jobs as u32, + requested_at: build_info.requested_at, + started_at: build_info.started_at, + completed_at: build_info.completed_at, + duration_ms: build_info.duration_ms, + cancelled: build_info.cancelled, + cancel_reason: build_info.cancel_reason, + timeline: protobuf_timeline, + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } + /// Cancel a build with a reason /// /// This method uses the EventWriter to write a build cancellation event. diff --git a/databuild/repositories/jobs/mod.rs b/databuild/repositories/jobs/mod.rs index dda5f97..6d85d2e 100644 --- a/databuild/repositories/jobs/mod.rs +++ b/databuild/repositories/jobs/mod.rs @@ -293,6 +293,48 @@ impl JobsRepository { Ok(Some((job_info, job_runs))) } + + /// Show detailed information about a specific job using protobuf response format + /// + /// Returns the complete job details with dual status fields and run details. + pub async fn show_protobuf(&self, job_label: &str) -> Result> { + // Get job info and runs using existing show method + if let Some((job_info, job_runs)) = self.show(job_label).await? { + // Convert job runs to protobuf format + let protobuf_runs: Vec = job_runs + .into_iter() + .map(|run| crate::JobRunDetail { + job_run_id: run.job_run_id, + build_request_id: run.build_request_id, + target_partitions: run.target_partitions, + status_code: run.status as i32, + status_name: run.status.to_display_string(), + started_at: run.started_at, + completed_at: run.completed_at, + duration_ms: run.duration_ms, + message: run.message, + }) + .collect(); + + let response = JobDetailResponse { + job_label: job_info.job_label, + total_runs: job_info.total_runs as u32, + successful_runs: job_info.successful_runs as u32, + failed_runs: job_info.failed_runs as u32, + cancelled_runs: job_info.cancelled_runs as u32, + average_partitions_per_run: job_info.average_partitions_per_run, + last_run_timestamp: job_info.last_run_timestamp, + last_run_status_code: job_info.last_run_status as i32, + last_run_status_name: job_info.last_run_status.to_display_string(), + recent_builds: job_info.recent_builds, + runs: protobuf_runs, + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } } #[cfg(test)] diff --git a/databuild/repositories/partitions/mod.rs b/databuild/repositories/partitions/mod.rs index 5ab73ae..23ad70d 100644 --- a/databuild/repositories/partitions/mod.rs +++ b/databuild/repositories/partitions/mod.rs @@ -249,6 +249,42 @@ impl PartitionsRepository { event_writer.invalidate_partition(build_request_id, partition, reason).await } + /// Show detailed information about a specific partition using protobuf response format + /// + /// Returns the complete partition details with dual status fields and timeline events. + pub async fn show_protobuf(&self, partition_ref: &str) -> Result> { + // Get partition info and timeline using existing show method + if let Some((partition_info, timeline)) = self.show(partition_ref).await? { + // Convert timeline events to protobuf format + let protobuf_timeline: Vec = timeline + .into_iter() + .map(|event| PartitionTimelineEvent { + timestamp: event.timestamp, + status_code: event.status as i32, + status_name: event.status.to_display_string(), + message: event.message, + build_request_id: event.build_request_id, + job_run_id: event.job_run_id, + }) + .collect(); + + let response = PartitionDetailResponse { + partition_ref: partition_info.partition_ref, + status_code: partition_info.current_status as i32, + status_name: partition_info.current_status.to_display_string(), + last_updated: partition_info.last_updated, + builds_count: partition_info.builds_count as u32, + last_successful_build: partition_info.last_successful_build, + invalidation_count: partition_info.invalidation_count as u32, + timeline: protobuf_timeline, + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } + /// List partitions returning protobuf response format with dual status fields /// /// This method provides the unified CLI/Service response format with both diff --git a/databuild/repositories/tasks/mod.rs b/databuild/repositories/tasks/mod.rs index d7c726c..94ce28e 100644 --- a/databuild/repositories/tasks/mod.rs +++ b/databuild/repositories/tasks/mod.rs @@ -297,6 +297,48 @@ impl TasksRepository { let event_writer = crate::event_log::writer::EventWriter::new(self.event_log.clone()); event_writer.cancel_task(build_request_id, job_run_id.to_string(), reason).await } + + /// Show detailed information about a specific task using protobuf response format + /// + /// Returns the complete task details with dual status fields and timeline events. + pub async fn show_protobuf(&self, job_run_id: &str) -> Result> { + // Get task info and timeline using existing show method + if let Some((task_info, timeline)) = self.show(job_run_id).await? { + // Convert timeline events to protobuf format + let protobuf_timeline: Vec = timeline + .into_iter() + .map(|event| TaskTimelineEvent { + timestamp: event.timestamp, + status_code: event.status.map(|s| s as i32), + status_name: event.status.map(|s| s.to_display_string()), + message: event.message, + event_type: event.event_type, + cancel_reason: event.cancel_reason, + }) + .collect(); + + let response = TaskDetailResponse { + job_run_id: task_info.job_run_id, + job_label: task_info.job_label, + build_request_id: task_info.build_request_id, + status_code: task_info.status as i32, + status_name: task_info.status.to_display_string(), + target_partitions: task_info.target_partitions, + scheduled_at: task_info.scheduled_at, + started_at: task_info.started_at, + completed_at: task_info.completed_at, + duration_ms: task_info.duration_ms, + cancelled: task_info.cancelled, + cancel_reason: task_info.cancel_reason, + message: task_info.message, + timeline: protobuf_timeline, + }; + + Ok(Some(response)) + } else { + Ok(None) + } + } } #[cfg(test)] diff --git a/databuild/status_utils.rs b/databuild/status_utils.rs index ba38de3..ca93125 100644 --- a/databuild/status_utils.rs +++ b/databuild/status_utils.rs @@ -92,6 +92,25 @@ impl BuildRequestStatus { } } +impl DepType { + /// Convert dependency type to human-readable string + pub fn to_display_string(&self) -> String { + match self { + DepType::Query => "query".to_string(), + DepType::Materialize => "materialize".to_string(), + } + } + + /// Parse a display string back to enum + pub fn from_display_string(s: &str) -> Option { + match s { + "query" => Some(DepType::Query), + "materialize" => Some(DepType::Materialize), + _ => None, + } + } +} + /// Helper functions for creating protobuf list responses with dual status fields pub mod list_response_helpers { use super::*; @@ -203,6 +222,18 @@ pub mod list_response_helpers { cancelled, } } + + /// Create a DataDep with dual fields from repository data + pub fn create_data_dep( + dep_type: DepType, + partition_ref: PartitionRef, + ) -> DataDep { + DataDep { + dep_type_code: dep_type as i32, + dep_type_name: dep_type.to_display_string(), + partition_ref: Some(partition_ref), + } + } } #[cfg(test)] @@ -230,10 +261,22 @@ mod tests { assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(status)); } + #[test] + fn test_dep_type_conversions() { + let dep_type = DepType::Materialize; + assert_eq!(dep_type.to_display_string(), "materialize"); + assert_eq!(DepType::from_display_string("materialize"), Some(dep_type)); + + let dep_type = DepType::Query; + assert_eq!(dep_type.to_display_string(), "query"); + assert_eq!(DepType::from_display_string("query"), Some(dep_type)); + } + #[test] fn test_invalid_display_string() { assert_eq!(PartitionStatus::from_display_string("invalid"), None); assert_eq!(JobStatus::from_display_string("invalid"), None); assert_eq!(BuildRequestStatus::from_display_string("invalid"), None); + assert_eq!(DepType::from_display_string("invalid"), None); } } \ No newline at end of file diff --git a/examples/podcast_reviews/categorize_reviews_job.py b/examples/podcast_reviews/categorize_reviews_job.py index 4cfc695..abe4c3a 100644 --- a/examples/podcast_reviews/categorize_reviews_job.py +++ b/examples/podcast_reviews/categorize_reviews_job.py @@ -55,8 +55,8 @@ def handle_config(args): configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": reviews_ref}}, - {"dep_type": 1, "partition_ref": {"str": podcasts_ref}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": reviews_ref}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": podcasts_ref}} ], "args": [category, date_str], "env": { @@ -135,8 +135,8 @@ def handle_exec(args): "config": { "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": f"reviews/date={target_date}"}}, - {"dep_type": 1, "partition_ref": {"str": "podcasts/all"}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"reviews/date={target_date}"}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": "podcasts/all"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date} diff --git a/examples/podcast_reviews/daily_summary_job.py b/examples/podcast_reviews/daily_summary_job.py index 774d4fa..be00716 100644 --- a/examples/podcast_reviews/daily_summary_job.py +++ b/examples/podcast_reviews/daily_summary_job.py @@ -55,8 +55,8 @@ def handle_config(args): configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}}, - {"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_stats_ref}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}} ], "args": [category, date_str], "env": { @@ -125,8 +125,8 @@ def handle_exec(args): "config": { "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}}, - {"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date} diff --git a/examples/podcast_reviews/phrase_modeling_job.py b/examples/podcast_reviews/phrase_modeling_job.py index ac95246..b840e64 100644 --- a/examples/podcast_reviews/phrase_modeling_job.py +++ b/examples/podcast_reviews/phrase_modeling_job.py @@ -56,7 +56,7 @@ def handle_config(args): configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}} ], "args": [category, date_str], "env": { @@ -113,7 +113,7 @@ def handle_exec(args): "config": { "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date} diff --git a/examples/podcast_reviews/phrase_stats_job.py b/examples/podcast_reviews/phrase_stats_job.py index a20f567..44172c2 100644 --- a/examples/podcast_reviews/phrase_stats_job.py +++ b/examples/podcast_reviews/phrase_stats_job.py @@ -55,8 +55,8 @@ def handle_config(args): configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": phrase_models_ref}}, - {"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_models_ref}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}} ], "args": [category, date_str], "env": { @@ -125,8 +125,8 @@ def handle_exec(args): "config": { "outputs": [{"str": partition_ref}], "inputs": [ - {"dep_type": 1, "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}}, - {"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}}, + {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}