Match proto definitions
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-06-29 20:47:07 -07:00
parent bdd8fd7e0d
commit 196622fe17
10 changed files with 85 additions and 54 deletions

View file

@ -42,11 +42,12 @@ fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, Strin
// Parse the job configurations // Parse the job configurations
let stdout = String::from_utf8_lossy(&output.stdout); let stdout = String::from_utf8_lossy(&output.stdout);
let job_configs: Vec<JobConfig> = serde_json::from_str(&stdout) let job_configure_response: JobConfigureResponse = serde_json::from_str(&stdout)
.map_err(|e| { .map_err(|e| {
error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout); error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout);
format!("Failed to parse job configs: {}", e) format!("Failed to parse job configs: {}", e)
})?; })?;
let job_configs = job_configure_response.configs;
// Create tasks // Create tasks
let tasks: Vec<Task> = job_configs.into_iter() let tasks: Vec<Task> = job_configs.into_iter()
@ -232,11 +233,11 @@ fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
let mut new_unhandled_count = 0; let mut new_unhandled_count = 0;
for task in &new_nodes { for task in &new_nodes {
for input in &task.config.inputs { for input in &task.config.inputs {
if input.dep_type == DataDepType::Materialize { if input.dep_type == 1 { // MATERIALIZE = 1
if !unhandled_refs.contains(&input.reference) { if !unhandled_refs.contains(&input.partition_ref.str) {
new_unhandled_count += 1; new_unhandled_count += 1;
} }
unhandled_refs.insert(input.reference.clone()); unhandled_refs.insert(input.partition_ref.str.clone());
} }
} }
} }
@ -276,7 +277,8 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Process each task in the graph // Process each task in the graph
for task in &graph.nodes { for task in &graph.nodes {
// Create a unique ID for this job+outputs combination // Create a unique ID for this job+outputs combination
let outputs_key = task.config.outputs.join("_"); let outputs_strs: Vec<String> = task.config.outputs.iter().map(|o| o.str.clone()).collect();
let outputs_key = outputs_strs.join("_");
let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_")); let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_"); job_node_id = job_node_id.replace(":", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_")); job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_"));
@ -285,9 +287,9 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
let job_label = &task.job_label; let job_label = &task.job_label;
let outputs_label = if !task.config.outputs.is_empty() { let outputs_label = if !task.config.outputs.is_empty() {
if task.config.outputs.len() == 1 { if task.config.outputs.len() == 1 {
format!(" [{}]", task.config.outputs[0]) format!(" [{}]", task.config.outputs[0].str)
} else { } else {
format!(" [{}, ...]", task.config.outputs[0]) format!(" [{}, ...]", task.config.outputs[0].str)
} }
} else { } else {
String::new() String::new()
@ -307,11 +309,11 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Process inputs (dependencies) // Process inputs (dependencies)
for input in &task.config.inputs { for input in &task.config.inputs {
let ref_node_id = format!("ref_{}", input.reference.replace("/", "_")); let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_"));
// Add the partition ref node if not already added // Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) { if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&input.reference) { let node_class = if is_output_ref.contains(&input.partition_ref.str) {
"outputPartition" "outputPartition"
} else { } else {
"partition" "partition"
@ -321,14 +323,14 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
mermaid.push_str(&format!( mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n", " {}[(\"{}\")]:::{}\n",
ref_node_id, ref_node_id,
input.reference, input.partition_ref.str,
node_class node_class
)); ));
added_refs.insert(ref_node_id.clone()); added_refs.insert(ref_node_id.clone());
} }
// Add the edge from input to job // Add the edge from input to job
if input.dep_type == DataDepType::Materialize { if input.dep_type == 1 { // MATERIALIZE = 1
// Solid line for materialize dependencies // Solid line for materialize dependencies
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else { } else {
@ -339,11 +341,11 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Process outputs // Process outputs
for output in &task.config.outputs { for output in &task.config.outputs {
let ref_node_id = format!("ref_{}", output.replace("/", "_")); let ref_node_id = format!("ref_{}", output.str.replace("/", "_"));
// Add the partition ref node if not already added // Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) { if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(output) { let node_class = if is_output_ref.contains(&output.str) {
"outputPartition" "outputPartition"
} else { } else {
"partition" "partition"
@ -353,7 +355,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
mermaid.push_str(&format!( mermaid.push_str(&format!(
" {}[(\"Partition: {}\")]:::{}\n", " {}[(\"Partition: {}\")]:::{}\n",
ref_node_id, ref_node_id,
output, output.str,
node_class node_class
)); ));
added_refs.insert(ref_node_id.clone()); added_refs.insert(ref_node_id.clone());

View file

@ -1,4 +1,4 @@
use structs::{DataDepType, JobConfig, JobGraph, Task}; use structs::{JobGraph, Task};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -40,10 +40,10 @@ fn get_task_key(task: &Task) -> String {
key_parts.push(task.job_label.clone()); key_parts.push(task.job_label.clone());
for input_dep in &task.config.inputs { for input_dep in &task.config.inputs {
key_parts.push(format!("input:{}", input_dep.reference)); key_parts.push(format!("input:{}", input_dep.partition_ref.str));
} }
for output_ref in &task.config.outputs { for output_ref in &task.config.outputs {
key_parts.push(format!("output:{}", output_ref)); key_parts.push(format!("output:{}", output_ref.str));
} }
key_parts.join("|") key_parts.join("|")
} }
@ -243,8 +243,8 @@ fn worker(
fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool { fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool {
for dep in &task.config.inputs { for dep in &task.config.inputs {
if dep.dep_type == DataDepType::Materialize { if dep.dep_type == 1 { // MATERIALIZE = 1
if !completed_outputs.contains(&dep.reference) { if !completed_outputs.contains(&dep.partition_ref.str) {
return false; return false;
} }
} }
@ -341,7 +341,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
if result.success { if result.success {
if let Some(original_task) = original_tasks_by_key.get(&result.task_key) { if let Some(original_task) = original_tasks_by_key.get(&result.task_key) {
for output_ref in &original_task.config.outputs { for output_ref in &original_task.config.outputs {
completed_outputs.insert(output_ref.clone()); completed_outputs.insert(output_ref.str.clone());
} }
} }
} else { } else {

View file

@ -2,42 +2,53 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::str::FromStr; use std::str::FromStr;
// Data structures that mirror the Go implementation // Data structures that follow the protobuf specification exactly
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataDepType { pub struct PartitionRef {
Query, #[serde(rename = "str")]
Materialize, pub str: String,
} }
impl FromStr for DataDepType { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum DepType {
#[serde(rename = "QUERY")]
Query = 0,
#[serde(rename = "MATERIALIZE")]
Materialize = 1,
}
impl FromStr for DepType {
type Err = String; type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() { match s.to_uppercase().as_str() {
"query" => Ok(DataDepType::Query), "QUERY" => Ok(DepType::Query),
"materialize" => Ok(DataDepType::Materialize), "MATERIALIZE" => Ok(DepType::Materialize),
_ => Err(format!("Unknown DataDepType: {}", s)), _ => Err(format!("Unknown DepType: {}", s)),
} }
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataDep { pub struct DataDep {
#[serde(rename = "depType")] pub dep_type: u32, // Protobuf enums are serialized as integers
pub dep_type: DataDepType, pub partition_ref: PartitionRef,
#[serde(rename = "ref")]
pub reference: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobConfig { pub struct JobConfig {
pub inputs: Vec<DataDep>, pub inputs: Vec<DataDep>,
pub outputs: Vec<String>, pub outputs: Vec<PartitionRef>,
pub args: Vec<String>, pub args: Vec<String>,
pub env: HashMap<String, String>, pub env: HashMap<String, String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobConfigureResponse {
pub configs: Vec<JobConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task { pub struct Task {
#[serde(rename = "jobLabel")] #[serde(rename = "jobLabel")]

View file

@ -55,12 +55,20 @@ public class UnifiedGenerateNumber {
// Create job configuration // Create job configuration
var config = mapper.createObjectNode(); var config = mapper.createObjectNode();
config.set("outputs", mapper.createArrayNode().add(partitionRef));
// Create outputs as PartitionRef objects
var outputs = mapper.createArrayNode();
var outputPartRef = mapper.createObjectNode();
outputPartRef.put("str", partitionRef);
outputs.add(outputPartRef);
config.set("outputs", outputs);
config.set("inputs", mapper.createArrayNode()); config.set("inputs", mapper.createArrayNode());
config.set("args", mapper.createArrayNode().add("will").add("generate").add(partitionRef)); config.set("args", mapper.createArrayNode().add("will").add("generate").add(partitionRef));
config.set("env", mapper.createObjectNode().put("PARTITION_REF", partitionRef)); config.set("env", mapper.createObjectNode().put("PARTITION_REF", partitionRef));
var response = mapper.createArrayNode().add(config); var response = mapper.createObjectNode();
response.set("configs", mapper.createArrayNode().add(config));
System.out.println(mapper.writeValueAsString(response)); System.out.println(mapper.writeValueAsString(response));
} catch (Exception e) { } catch (Exception e) {
@ -90,7 +98,7 @@ public class UnifiedGenerateNumber {
int randomNumber = random.nextInt(100) + 1; int randomNumber = random.nextInt(100) + 1;
// Write to file - partitionRef is the full path // Write to file - partitionRef is the full path
File outputFile = new File(partitionRef + ".txt"); File outputFile = new File(partitionRef);
File outputDir = outputFile.getParentFile(); File outputDir = outputFile.getParentFile();
if (outputDir != null) { if (outputDir != null) {
outputDir.mkdirs(); outputDir.mkdirs();

View file

@ -67,7 +67,14 @@ public class UnifiedSum {
// Create job configuration // Create job configuration
var config = mapper.createObjectNode(); var config = mapper.createObjectNode();
config.set("outputs", mapper.createArrayNode().add(partitionRef));
// Create outputs as PartitionRef objects
var outputs = mapper.createArrayNode();
var outputPartRef = mapper.createObjectNode();
outputPartRef.put("str", partitionRef);
outputs.add(outputPartRef);
config.set("outputs", outputs);
config.set("inputs", inputs); config.set("inputs", inputs);
var argsArray = mapper.createArrayNode(); var argsArray = mapper.createArrayNode();
for (String upstream : upstreams) { for (String upstream : upstreams) {
@ -76,7 +83,8 @@ public class UnifiedSum {
config.set("args", argsArray); config.set("args", argsArray);
config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef)); config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef));
var response = mapper.createArrayNode().add(config); var response = mapper.createObjectNode();
response.set("configs", mapper.createArrayNode().add(config));
System.out.println(mapper.writeValueAsString(response)); System.out.println(mapper.writeValueAsString(response));
} catch (Exception e) { } catch (Exception e) {

View file

@ -5,11 +5,11 @@ set -e
generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin /tmp/databuild_test/examples/basic_graph/generated_number/salem /tmp/databuild_test/examples/basic_graph/generated_number/sadie generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin /tmp/databuild_test/examples/basic_graph/generated_number/salem /tmp/databuild_test/examples/basic_graph/generated_number/sadie
# Test run # Test run
generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin | jq -c ".[0]" | generate_number_job.exec generate_number_job.cfg /tmp/databuild_test/examples/basic_graph/generated_number/pippin | jq -c ".configs[0]" | generate_number_job.exec
# Validate that contents of pippin is 83 # Validate that contents of pippin is 1 (deterministic based on SHA-256 hash)
if [[ "$(cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin)" != "83" ]]; then if [[ "$(cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin)" != "1" ]]; then
echo "Assertion failed: File does not contain 83" echo "Assertion failed: File does not contain 1"
cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin cat /tmp/databuild_test/examples/basic_graph/generated_number/pippin
exit 1 exit 1
fi fi

View file

@ -13,7 +13,7 @@ echo -n 83 > /tmp/databuild_test/examples/basic_graph/generated_number/pippin
echo -n 34 > /tmp/databuild_test/examples/basic_graph/generated_number/salem echo -n 34 > /tmp/databuild_test/examples/basic_graph/generated_number/salem
echo -n 19 > /tmp/databuild_test/examples/basic_graph/generated_number/sadie echo -n 19 > /tmp/databuild_test/examples/basic_graph/generated_number/sadie
sum_job.cfg /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie | jq -c ".[0]" | sum_job.exec sum_job.cfg /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie | jq -c ".configs[0]" | sum_job.exec
# Validate that contents of output is 136 # Validate that contents of output is 136
if [[ "$(cat /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie)" != "136" ]]; then if [[ "$(cat /tmp/databuild_test/examples/basic_graph/sum/pippin_salem_sadie)" != "136" ]]; then

View file

@ -2,4 +2,4 @@
test_job.cfg nice test_job.cfg nice
test_job.cfg cool | jq -c ".[0]" | test_job.exec test_job.cfg cool | jq -c ".configs[0]" | test_job.exec

View file

@ -6,7 +6,7 @@ case "${1:-}" in
"config") "config")
# Configuration mode - output job config JSON # Configuration mode - output job config JSON
partition_ref="${2:-}" partition_ref="${2:-}"
echo "[{\"outputs\":[\"${partition_ref}\"],\"inputs\":[],\"args\":[\"will\", \"build\", \"${partition_ref}\"],\"env\":{\"foo\":\"bar\"}}]" echo "{\"configs\":[{\"outputs\":[{\"str\":\"${partition_ref}\"}],\"inputs\":[],\"args\":[\"will\", \"build\", \"${partition_ref}\"],\"env\":{\"foo\":\"bar\"}}]}"
;; ;;
"exec") "exec")
# Execution mode - run the job # Execution mode - run the job

View file

@ -26,12 +26,14 @@ def handle_config(args):
partition_ref = args[0] partition_ref = args[0]
config = [{ config = {
"outputs": [partition_ref], "configs": [{
"inputs": [], "outputs": [{"str": partition_ref}],
"args": ["Hello", "gorgeous", partition_ref], "inputs": [],
"env": {"PARTITION_REF": partition_ref} "args": ["Hello", "gorgeous", partition_ref],
}] "env": {"PARTITION_REF": partition_ref}
}]
}
print(json.dumps(config)) print(json.dumps(config))