Code/name WIP

This commit is contained in:
Stuart Axelbrooke 2025-07-20 16:01:40 -07:00
parent dcc71bd13b
commit 894bbc35bd
13 changed files with 372 additions and 25 deletions

View file

@ -22,8 +22,9 @@ enum DepType {
// Represents a data dependency
message DataDep {
DepType dep_type = 1;
PartitionRef partition_ref = 2;
DepType dep_type_code = 1; // Enum for programmatic use
string dep_type_name = 2; // Human-readable string ("query", "materialize")
PartitionRef partition_ref = 3; // Moved from field 2 to 3
}
// Configuration for a job
@ -389,6 +390,141 @@ message BuildSummary {
bool cancelled = 13;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Detail Operations (Unified CLI/Service Detail Responses)
///////////////////////////////////////////////////////////////////////////////////////////////
//
// Build Detail
//
message BuildDetailRequest {
string build_request_id = 1;
}
message BuildDetailResponse {
string build_request_id = 1;
BuildRequestStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
repeated PartitionRef requested_partitions = 4;
uint32 total_jobs = 5;
uint32 completed_jobs = 6;
uint32 failed_jobs = 7;
uint32 cancelled_jobs = 8;
int64 requested_at = 9;
optional int64 started_at = 10;
optional int64 completed_at = 11;
optional int64 duration_ms = 12;
bool cancelled = 13;
optional string cancel_reason = 14;
repeated BuildTimelineEvent timeline = 15;
}
message BuildTimelineEvent {
int64 timestamp = 1;
optional BuildRequestStatus status_code = 2; // Enum for programmatic use
optional string status_name = 3; // Human-readable string
string message = 4;
string event_type = 5;
optional string cancel_reason = 6;
}
//
// Partition Detail
//
message PartitionDetailRequest {
string partition_ref = 1;
}
message PartitionDetailResponse {
string partition_ref = 1;
PartitionStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
int64 last_updated = 4;
uint32 builds_count = 5;
optional string last_successful_build = 6;
uint32 invalidation_count = 7;
repeated PartitionTimelineEvent timeline = 8;
}
message PartitionTimelineEvent {
int64 timestamp = 1;
PartitionStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
string message = 4;
string build_request_id = 5;
optional string job_run_id = 6;
}
//
// Job Detail
//
message JobDetailRequest {
string job_label = 1;
}
message JobDetailResponse {
string job_label = 1;
uint32 total_runs = 2;
uint32 successful_runs = 3;
uint32 failed_runs = 4;
uint32 cancelled_runs = 5;
double average_partitions_per_run = 6;
int64 last_run_timestamp = 7;
JobStatus last_run_status_code = 8; // Enum for programmatic use
string last_run_status_name = 9; // Human-readable string
repeated string recent_builds = 10;
repeated JobRunDetail runs = 11;
}
message JobRunDetail {
string job_run_id = 1;
string build_request_id = 2;
repeated PartitionRef target_partitions = 3;
JobStatus status_code = 4; // Enum for programmatic use
string status_name = 5; // Human-readable string
optional int64 started_at = 6;
optional int64 completed_at = 7;
optional int64 duration_ms = 8;
string message = 9;
}
//
// Task Detail
//
message TaskDetailRequest {
string job_run_id = 1;
}
message TaskDetailResponse {
string job_run_id = 1;
string job_label = 2;
string build_request_id = 3;
JobStatus status_code = 4; // Enum for programmatic use
string status_name = 5; // Human-readable string
repeated PartitionRef target_partitions = 6;
int64 scheduled_at = 7;
optional int64 started_at = 8;
optional int64 completed_at = 9;
optional int64 duration_ms = 10;
bool cancelled = 11;
optional string cancel_reason = 12;
string message = 13;
repeated TaskTimelineEvent timeline = 14;
}
message TaskTimelineEvent {
int64 timestamp = 1;
optional JobStatus status_code = 2; // Enum for programmatic use
optional string status_name = 3; // Human-readable string
string message = 4;
string event_type = 5;
optional string cancel_reason = 6;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Services
///////////////////////////////////////////////////////////////////////////////////////////////

View file

@ -309,7 +309,7 @@ async fn plan(
let mut new_unhandled_count = 0;
for task in &new_nodes {
for input in &task.config.as_ref().unwrap().inputs {
if input.dep_type == 1 { // MATERIALIZE = 1
if input.dep_type_code == 1 { // MATERIALIZE = 1
if !unhandled_refs.contains(&input.partition_ref.as_ref().unwrap().str) {
new_unhandled_count += 1;
}

View file

@ -248,7 +248,7 @@ fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool {
let mut missing_deps = Vec::new();
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
if dep.dep_type_code == 1 { // MATERIALIZE = 1
if !completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str) {
missing_deps.push(&dep.partition_ref.as_ref().unwrap().str);
}
@ -697,7 +697,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
warn!("Pending task: {} ({})", task.job.as_ref().unwrap().label, key);
warn!(" Required inputs:");
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
if dep.dep_type_code == 1 { // MATERIALIZE = 1
let available = completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str);
warn!(" {} - {}", dep.partition_ref.as_ref().unwrap().str, if available { "AVAILABLE" } else { "MISSING" });
}

View file

@ -543,7 +543,7 @@ pub fn generate_mermaid_with_status(
for input in &config.inputs {
if let Some(partition_ref) = &input.partition_ref {
let ref_id = builder.add_partition_node(&partition_ref.str);
let edge_type = if input.dep_type == 1 {
let edge_type = if input.dep_type_code == 1 {
EdgeType::Solid
} else {
EdgeType::Dotted
@ -661,7 +661,8 @@ mod tests {
inputs: vec![{
let mut input = DataDep::default();
input.partition_ref = Some(PartitionRef { str: "input/data".to_string() });
input.dep_type = 1; // Solid dependency
input.dep_type_code = 1; // Solid dependency
input.dep_type_name = "materialize".to_string();
input
}],
outputs: vec![
@ -678,7 +679,8 @@ mod tests {
inputs: vec![{
let mut input = DataDep::default();
input.partition_ref = Some(PartitionRef { str: "intermediate/data".to_string() });
input.dep_type = 2; // Dotted dependency
input.dep_type_code = 0; // Dotted dependency
input.dep_type_name = "query".to_string();
input
}],
outputs: vec![
@ -789,7 +791,8 @@ mod tests {
inputs: vec![{
let mut input = DataDep::default();
input.partition_ref = Some(PartitionRef { str: "input/data".to_string() });
input.dep_type = 1; // Solid dependency
input.dep_type_code = 1; // Solid dependency
input.dep_type_name = "materialize".to_string();
input
}],
outputs: vec![
@ -859,7 +862,8 @@ mod tests {
inputs: vec![{
let mut input = DataDep::default();
input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() });
input.dep_type = 1;
input.dep_type_code = 1;
input.dep_type_name = "materialize".to_string();
input
}],
outputs: vec![
@ -875,7 +879,8 @@ mod tests {
inputs: vec![{
let mut input = DataDep::default();
input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() });
input.dep_type = 1;
input.dep_type_code = 1;
input.dep_type_name = "materialize".to_string();
input
}],
outputs: vec![

View file

@ -276,6 +276,49 @@ impl BuildsRepository {
Ok(build_info.map(|info| (info, timeline)))
}
/// Show detailed information about a specific build using protobuf response format
///
/// Returns the complete build details with dual status fields and timeline events.
pub async fn show_protobuf(&self, build_request_id: &str) -> Result<Option<BuildDetailResponse>> {
// Get build info and timeline using existing show method
if let Some((build_info, timeline)) = self.show(build_request_id).await? {
// Convert timeline events to protobuf format
let protobuf_timeline: Vec<BuildTimelineEvent> = timeline
.into_iter()
.map(|event| BuildTimelineEvent {
timestamp: event.timestamp,
status_code: event.status.map(|s| s as i32),
status_name: event.status.map(|s| s.to_display_string()),
message: event.message,
event_type: event.event_type,
cancel_reason: event.cancel_reason,
})
.collect();
let response = BuildDetailResponse {
build_request_id: build_info.build_request_id,
status_code: build_info.status as i32,
status_name: build_info.status.to_display_string(),
requested_partitions: build_info.requested_partitions,
total_jobs: build_info.total_jobs as u32,
completed_jobs: build_info.completed_jobs as u32,
failed_jobs: build_info.failed_jobs as u32,
cancelled_jobs: build_info.cancelled_jobs as u32,
requested_at: build_info.requested_at,
started_at: build_info.started_at,
completed_at: build_info.completed_at,
duration_ms: build_info.duration_ms,
cancelled: build_info.cancelled,
cancel_reason: build_info.cancel_reason,
timeline: protobuf_timeline,
};
Ok(Some(response))
} else {
Ok(None)
}
}
/// Cancel a build with a reason
///
/// This method uses the EventWriter to write a build cancellation event.

View file

@ -293,6 +293,48 @@ impl JobsRepository {
Ok(Some((job_info, job_runs)))
}
/// Show detailed information about a specific job using protobuf response format
///
/// Returns the complete job details with dual status fields and run details.
pub async fn show_protobuf(&self, job_label: &str) -> Result<Option<JobDetailResponse>> {
// Get job info and runs using existing show method
if let Some((job_info, job_runs)) = self.show(job_label).await? {
// Convert job runs to protobuf format
let protobuf_runs: Vec<crate::JobRunDetail> = job_runs
.into_iter()
.map(|run| crate::JobRunDetail {
job_run_id: run.job_run_id,
build_request_id: run.build_request_id,
target_partitions: run.target_partitions,
status_code: run.status as i32,
status_name: run.status.to_display_string(),
started_at: run.started_at,
completed_at: run.completed_at,
duration_ms: run.duration_ms,
message: run.message,
})
.collect();
let response = JobDetailResponse {
job_label: job_info.job_label,
total_runs: job_info.total_runs as u32,
successful_runs: job_info.successful_runs as u32,
failed_runs: job_info.failed_runs as u32,
cancelled_runs: job_info.cancelled_runs as u32,
average_partitions_per_run: job_info.average_partitions_per_run,
last_run_timestamp: job_info.last_run_timestamp,
last_run_status_code: job_info.last_run_status as i32,
last_run_status_name: job_info.last_run_status.to_display_string(),
recent_builds: job_info.recent_builds,
runs: protobuf_runs,
};
Ok(Some(response))
} else {
Ok(None)
}
}
}
#[cfg(test)]

View file

@ -249,6 +249,42 @@ impl PartitionsRepository {
event_writer.invalidate_partition(build_request_id, partition, reason).await
}
/// Show detailed information about a specific partition using protobuf response format
///
/// Returns the complete partition details with dual status fields and timeline events.
pub async fn show_protobuf(&self, partition_ref: &str) -> Result<Option<PartitionDetailResponse>> {
// Get partition info and timeline using existing show method
if let Some((partition_info, timeline)) = self.show(partition_ref).await? {
// Convert timeline events to protobuf format
let protobuf_timeline: Vec<PartitionTimelineEvent> = timeline
.into_iter()
.map(|event| PartitionTimelineEvent {
timestamp: event.timestamp,
status_code: event.status as i32,
status_name: event.status.to_display_string(),
message: event.message,
build_request_id: event.build_request_id,
job_run_id: event.job_run_id,
})
.collect();
let response = PartitionDetailResponse {
partition_ref: partition_info.partition_ref,
status_code: partition_info.current_status as i32,
status_name: partition_info.current_status.to_display_string(),
last_updated: partition_info.last_updated,
builds_count: partition_info.builds_count as u32,
last_successful_build: partition_info.last_successful_build,
invalidation_count: partition_info.invalidation_count as u32,
timeline: protobuf_timeline,
};
Ok(Some(response))
} else {
Ok(None)
}
}
/// List partitions returning protobuf response format with dual status fields
///
/// This method provides the unified CLI/Service response format with both

View file

@ -297,6 +297,48 @@ impl TasksRepository {
let event_writer = crate::event_log::writer::EventWriter::new(self.event_log.clone());
event_writer.cancel_task(build_request_id, job_run_id.to_string(), reason).await
}
/// Show detailed information about a specific task using protobuf response format
///
/// Returns the complete task details with dual status fields and timeline events.
pub async fn show_protobuf(&self, job_run_id: &str) -> Result<Option<TaskDetailResponse>> {
// Get task info and timeline using existing show method
if let Some((task_info, timeline)) = self.show(job_run_id).await? {
// Convert timeline events to protobuf format
let protobuf_timeline: Vec<TaskTimelineEvent> = timeline
.into_iter()
.map(|event| TaskTimelineEvent {
timestamp: event.timestamp,
status_code: event.status.map(|s| s as i32),
status_name: event.status.map(|s| s.to_display_string()),
message: event.message,
event_type: event.event_type,
cancel_reason: event.cancel_reason,
})
.collect();
let response = TaskDetailResponse {
job_run_id: task_info.job_run_id,
job_label: task_info.job_label,
build_request_id: task_info.build_request_id,
status_code: task_info.status as i32,
status_name: task_info.status.to_display_string(),
target_partitions: task_info.target_partitions,
scheduled_at: task_info.scheduled_at,
started_at: task_info.started_at,
completed_at: task_info.completed_at,
duration_ms: task_info.duration_ms,
cancelled: task_info.cancelled,
cancel_reason: task_info.cancel_reason,
message: task_info.message,
timeline: protobuf_timeline,
};
Ok(Some(response))
} else {
Ok(None)
}
}
}
#[cfg(test)]

View file

@ -92,6 +92,25 @@ impl BuildRequestStatus {
}
}
impl DepType {
/// Convert dependency type to human-readable string
pub fn to_display_string(&self) -> String {
match self {
DepType::Query => "query".to_string(),
DepType::Materialize => "materialize".to_string(),
}
}
/// Parse a display string back to enum
pub fn from_display_string(s: &str) -> Option<Self> {
match s {
"query" => Some(DepType::Query),
"materialize" => Some(DepType::Materialize),
_ => None,
}
}
}
/// Helper functions for creating protobuf list responses with dual status fields
pub mod list_response_helpers {
use super::*;
@ -203,6 +222,18 @@ pub mod list_response_helpers {
cancelled,
}
}
/// Create a DataDep with dual fields from repository data
pub fn create_data_dep(
dep_type: DepType,
partition_ref: PartitionRef,
) -> DataDep {
DataDep {
dep_type_code: dep_type as i32,
dep_type_name: dep_type.to_display_string(),
partition_ref: Some(partition_ref),
}
}
}
#[cfg(test)]
@ -230,10 +261,22 @@ mod tests {
assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(status));
}
#[test]
fn test_dep_type_conversions() {
let dep_type = DepType::Materialize;
assert_eq!(dep_type.to_display_string(), "materialize");
assert_eq!(DepType::from_display_string("materialize"), Some(dep_type));
let dep_type = DepType::Query;
assert_eq!(dep_type.to_display_string(), "query");
assert_eq!(DepType::from_display_string("query"), Some(dep_type));
}
#[test]
fn test_invalid_display_string() {
assert_eq!(PartitionStatus::from_display_string("invalid"), None);
assert_eq!(JobStatus::from_display_string("invalid"), None);
assert_eq!(BuildRequestStatus::from_display_string("invalid"), None);
assert_eq!(DepType::from_display_string("invalid"), None);
}
}

View file

@ -55,8 +55,8 @@ def handle_config(args):
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": reviews_ref}},
{"dep_type": 1, "partition_ref": {"str": podcasts_ref}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": reviews_ref}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": podcasts_ref}}
],
"args": [category, date_str],
"env": {
@ -135,8 +135,8 @@ def handle_exec(args):
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"reviews/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": "podcasts/all"}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"reviews/date={target_date}"}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": "podcasts/all"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}

View file

@ -55,8 +55,8 @@ def handle_config(args):
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}},
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_stats_ref}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
@ -125,8 +125,8 @@ def handle_exec(args):
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}

View file

@ -56,7 +56,7 @@ def handle_config(args):
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
@ -113,7 +113,7 @@ def handle_exec(args):
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}

View file

@ -55,8 +55,8 @@ def handle_config(args):
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": phrase_models_ref}},
{"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_models_ref}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
@ -125,8 +125,8 @@ def handle_exec(args):
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type": 1, "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}},
{"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}