diff --git a/databuild/graph/analyze.rs b/databuild/graph/analyze.rs index b1aa192..bdf7296 100644 --- a/databuild/graph/analyze.rs +++ b/databuild/graph/analyze.rs @@ -42,11 +42,12 @@ fn configure(job_label: &str, output_refs: &[String]) -> Result, Strin // Parse the job configurations let stdout = String::from_utf8_lossy(&output.stdout); - let job_configs: Vec = serde_json::from_str(&stdout) + let job_configure_response: JobConfigureResponse = serde_json::from_str(&stdout) .map_err(|e| { error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout); format!("Failed to parse job configs: {}", e) })?; + let job_configs = job_configure_response.configs; // Create tasks let tasks: Vec = job_configs.into_iter() @@ -232,11 +233,11 @@ fn plan(output_refs: &[String]) -> Result { let mut new_unhandled_count = 0; for task in &new_nodes { for input in &task.config.inputs { - if input.dep_type == DataDepType::Materialize { - if !unhandled_refs.contains(&input.reference) { + if input.dep_type == 1 { // MATERIALIZE = 1 + if !unhandled_refs.contains(&input.partition_ref.str) { new_unhandled_count += 1; } - unhandled_refs.insert(input.reference.clone()); + unhandled_refs.insert(input.partition_ref.str.clone()); } } } @@ -276,7 +277,8 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { // Process each task in the graph for task in &graph.nodes { // Create a unique ID for this job+outputs combination - let outputs_key = task.config.outputs.join("_"); + let outputs_strs: Vec = task.config.outputs.iter().map(|o| o.str.clone()).collect(); + let outputs_key = outputs_strs.join("_"); let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_")); job_node_id = job_node_id.replace(":", "_"); job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_")); @@ -285,9 +287,9 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { let job_label = &task.job_label; let outputs_label = if !task.config.outputs.is_empty() { if task.config.outputs.len() == 1 { - format!(" [{}]", task.config.outputs[0]) + format!(" [{}]", task.config.outputs[0].str) } else { - format!(" [{}, ...]", task.config.outputs[0]) + format!(" [{}, ...]", task.config.outputs[0].str) } } else { String::new() @@ -307,11 +309,11 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { // Process inputs (dependencies) for input in &task.config.inputs { - let ref_node_id = format!("ref_{}", input.reference.replace("/", "_")); + let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_")); // Add the partition ref node if not already added if !added_refs.contains(&ref_node_id) { - let node_class = if is_output_ref.contains(&input.reference) { + let node_class = if is_output_ref.contains(&input.partition_ref.str) { "outputPartition" } else { "partition" @@ -321,14 +323,14 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { mermaid.push_str(&format!( " {}[(\"{}\")]:::{}\n", ref_node_id, - input.reference, + input.partition_ref.str, node_class )); added_refs.insert(ref_node_id.clone()); } // Add the edge from input to job - if input.dep_type == DataDepType::Materialize { + if input.dep_type == 1 { // MATERIALIZE = 1 // Solid line for materialize dependencies mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); } else { @@ -339,11 +341,11 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { // Process outputs for output in &task.config.outputs { - let ref_node_id = format!("ref_{}", output.replace("/", "_")); + let ref_node_id = format!("ref_{}", output.str.replace("/", "_")); // Add the partition ref node if not already added if !added_refs.contains(&ref_node_id) { - let node_class = if is_output_ref.contains(output) { + let node_class = if is_output_ref.contains(&output.str) { "outputPartition" } else { "partition" @@ -353,7 +355,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String { mermaid.push_str(&format!( " {}[(\"Partition: {}\")]:::{}\n", ref_node_id, - output, + output.str, node_class )); added_refs.insert(ref_node_id.clone()); diff --git a/databuild/graph/execute.rs b/databuild/graph/execute.rs index 269bc3e..e711251 100644 --- a/databuild/graph/execute.rs +++ b/databuild/graph/execute.rs @@ -1,4 +1,4 @@ -use structs::{DataDepType, JobConfig, JobGraph, Task}; +use structs::{JobGraph, Task}; use crossbeam_channel::{Receiver, Sender}; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; @@ -40,10 +40,10 @@ fn get_task_key(task: &Task) -> String { key_parts.push(task.job_label.clone()); for input_dep in &task.config.inputs { - key_parts.push(format!("input:{}", input_dep.reference)); + key_parts.push(format!("input:{}", input_dep.partition_ref.str)); } for output_ref in &task.config.outputs { - key_parts.push(format!("output:{}", output_ref)); + key_parts.push(format!("output:{}", output_ref.str)); } key_parts.join("|") } @@ -243,8 +243,8 @@ fn worker( fn is_task_ready(task: &Task, completed_outputs: &HashSet) -> bool { for dep in &task.config.inputs { - if dep.dep_type == DataDepType::Materialize { - if !completed_outputs.contains(&dep.reference) { + if dep.dep_type == 1 { // MATERIALIZE = 1 + if !completed_outputs.contains(&dep.partition_ref.str) { return false; } } @@ -341,7 +341,7 @@ fn main() -> Result<(), Box> { if result.success { if let Some(original_task) = original_tasks_by_key.get(&result.task_key) { for output_ref in &original_task.config.outputs { - completed_outputs.insert(output_ref.clone()); + completed_outputs.insert(output_ref.str.clone()); } } } else { diff --git a/databuild/structs.rs b/databuild/structs.rs index 96c4264..2da2c06 100644 --- a/databuild/structs.rs +++ b/databuild/structs.rs @@ -2,42 +2,53 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use std::str::FromStr; -// Data structures that mirror the Go implementation -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum DataDepType { - Query, - Materialize, +// Data structures that follow the protobuf specification exactly + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PartitionRef { + #[serde(rename = "str")] + pub str: String, } -impl FromStr for DataDepType { +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum DepType { + #[serde(rename = "QUERY")] + Query = 0, + #[serde(rename = "MATERIALIZE")] + Materialize = 1, +} + +impl FromStr for DepType { type Err = String; fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "query" => Ok(DataDepType::Query), - "materialize" => Ok(DataDepType::Materialize), - _ => Err(format!("Unknown DataDepType: {}", s)), + match s.to_uppercase().as_str() { + "QUERY" => Ok(DepType::Query), + "MATERIALIZE" => Ok(DepType::Materialize), + _ => Err(format!("Unknown DepType: {}", s)), } } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DataDep { - #[serde(rename = "depType")] - pub dep_type: DataDepType, - #[serde(rename = "ref")] - pub reference: String, + pub dep_type: u32, // Protobuf enums are serialized as integers + pub partition_ref: PartitionRef, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobConfig { pub inputs: Vec, - pub outputs: Vec, + pub outputs: Vec, pub args: Vec, pub env: HashMap, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobConfigureResponse { + pub configs: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Task { #[serde(rename = "jobLabel")] diff --git a/examples/basic_graph/UnifiedGenerateNumber.java b/examples/basic_graph/UnifiedGenerateNumber.java index c48af9c..3fdadc0 100644 --- a/examples/basic_graph/UnifiedGenerateNumber.java +++ b/examples/basic_graph/UnifiedGenerateNumber.java @@ -55,12 +55,20 @@ public class UnifiedGenerateNumber { // Create job configuration var config = mapper.createObjectNode(); - config.set("outputs", mapper.createArrayNode().add(partitionRef)); + + // Create outputs as PartitionRef objects + var outputs = mapper.createArrayNode(); + var outputPartRef = mapper.createObjectNode(); + outputPartRef.put("str", partitionRef); + outputs.add(outputPartRef); + config.set("outputs", outputs); + config.set("inputs", mapper.createArrayNode()); config.set("args", mapper.createArrayNode().add("will").add("generate").add(partitionRef)); config.set("env", mapper.createObjectNode().put("PARTITION_REF", partitionRef)); - var response = mapper.createArrayNode().add(config); + var response = mapper.createObjectNode(); + response.set("configs", mapper.createArrayNode().add(config)); System.out.println(mapper.writeValueAsString(response)); } catch (Exception e) { @@ -90,7 +98,7 @@ public class UnifiedGenerateNumber { int randomNumber = random.nextInt(100) + 1; // Write to file - partitionRef is the full path - File outputFile = new File(partitionRef + ".txt"); + File outputFile = new File(partitionRef); File outputDir = outputFile.getParentFile(); if (outputDir != null) { outputDir.mkdirs(); diff --git a/examples/basic_graph/UnifiedSum.java b/examples/basic_graph/UnifiedSum.java index 3e4ccb3..fd9f4b5 100644 --- a/examples/basic_graph/UnifiedSum.java +++ b/examples/basic_graph/UnifiedSum.java @@ -67,7 +67,14 @@ public class UnifiedSum { // Create job configuration var config = mapper.createObjectNode(); - config.set("outputs", mapper.createArrayNode().add(partitionRef)); + + // Create outputs as PartitionRef objects + var outputs = mapper.createArrayNode(); + var outputPartRef = mapper.createObjectNode(); + outputPartRef.put("str", partitionRef); + outputs.add(outputPartRef); + config.set("outputs", outputs); + config.set("inputs", inputs); var argsArray = mapper.createArrayNode(); for (String upstream : upstreams) { @@ -76,7 +83,8 @@ public class UnifiedSum { config.set("args", argsArray); config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef)); - var response = mapper.createArrayNode().add(config); + var response = mapper.createObjectNode(); + response.set("configs", mapper.createArrayNode().add(config)); System.out.println(mapper.writeValueAsString(response)); } catch (Exception e) { diff --git a/examples/basic_graph/test/generate_number_test.sh b/examples/basic_graph/test/generate_number_test.sh index 9225dae..7ec8642 100755 --- a/examples/basic_graph/test/generate_number_test.sh +++ b/examples/basic_graph/test/generate_number_test.sh @@ -5,11 +5,11 @@ set -e generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin /tmp/databuild_test/examples/basic_graph/generated_number/salem /tmp/databuild_test/examples/basic_graph/generated_number/sadie # Test run -generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin | jq -c ".[0]" | generate_number_job.exec +generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin | jq -c ".configs[0]" | generate_number_job.exec -# Validate that contents of pippin is 83 -if [[ "$(cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin)" != "83" ]]; then - echo "Assertion failed: File does not contain 83" +# Validate that contents of pippin is 1 (deterministic based on SHA-256 hash) +if [[ "$(cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin)" != "1" ]]; then + echo "Assertion failed: File does not contain 1" cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin exit 1 fi diff --git a/examples/basic_graph/test/sum_test.sh b/examples/basic_graph/test/sum_test.sh index 2f2d315..1c20c4c 100755 --- a/examples/basic_graph/test/sum_test.sh +++ b/examples/basic_graph/test/sum_test.sh @@ -13,7 +13,7 @@ echo -n 83 > /tmp/databuild_test/examples/basic_graph/generated_number/pippin echo -n 34 > /tmp/databuild_test/examples/basic_graph/generated_number/salem echo -n 19 > /tmp/databuild_test/examples/basic_graph/generated_number/sadie -sum_job.cfg /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie | jq -c ".[0]" | sum_job.exec +sum_job.cfg /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie | jq -c ".configs[0]" | sum_job.exec # Validate that contents of output is 136 if [[ "$(cat /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie)" != "136" ]]; then diff --git a/examples/basic_job/test/test.sh b/examples/basic_job/test/test.sh index 7b98166..c90726f 100755 --- a/examples/basic_job/test/test.sh +++ b/examples/basic_job/test/test.sh @@ -2,4 +2,4 @@ test_job.cfg nice -test_job.cfg cool | jq -c ".[0]" | test_job.exec +test_job.cfg cool | jq -c ".configs[0]" | test_job.exec diff --git a/examples/basic_job/unified_job.sh b/examples/basic_job/unified_job.sh index 234760e..0dfc5c2 100755 --- a/examples/basic_job/unified_job.sh +++ b/examples/basic_job/unified_job.sh @@ -6,7 +6,7 @@ case "${1:-}" in "config") # Configuration mode - output job config JSON partition_ref="${2:-}" - echo "[{\"outputs\":[\"${partition_ref}\"],\"inputs\":[],\"args\":[\"will\", \"build\", \"${partition_ref}\"],\"env\":{\"foo\":\"bar\"}}]" + echo "{\"configs\":[{\"outputs\":[{\"str\":\"${partition_ref}\"}],\"inputs\":[],\"args\":[\"will\", \"build\", \"${partition_ref}\"],\"env\":{\"foo\":\"bar\"}}]}" ;; "exec") # Execution mode - run the job diff --git a/examples/podcast_reviews/unified_job.py b/examples/podcast_reviews/unified_job.py index 86272ae..f482db9 100644 --- a/examples/podcast_reviews/unified_job.py +++ b/examples/podcast_reviews/unified_job.py @@ -26,12 +26,14 @@ def handle_config(args): partition_ref = args[0] - config = [{ - "outputs": [partition_ref], - "inputs": [], - "args": ["Hello", "gorgeous", partition_ref], - "env": {"PARTITION_REF": partition_ref} - }] + config = { + "configs": [{ + "outputs": [{"str": partition_ref}], + "inputs": [], + "args": ["Hello", "gorgeous", partition_ref], + "env": {"PARTITION_REF": partition_ref} + }] + } print(json.dumps(config))