From 48a12888914cac175d8b32563d4ed816552eda8f Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sun, 29 Jun 2025 20:14:33 -0700 Subject: [PATCH] Clean up old paths --- databuild/rules.bzl | 80 +++--------- examples/basic_graph/BUILD.bazel | 76 +++--------- examples/basic_graph/GenerateConfigure.java | 46 ------- examples/basic_graph/GenerateExecute.java | 70 ----------- examples/basic_graph/SumConfigure.java | 58 --------- examples/basic_graph/SumExecute.java | 44 ------- examples/basic_graph/UnifiedSum.java | 130 ++++++++++++++++++++ examples/basic_job/configure.sh | 2 - examples/basic_job/execute.sh | 3 - examples/podcast_reviews/BUILD.bazel | 15 +-- examples/podcast_reviews/configure.py | 2 - examples/podcast_reviews/execute.py | 2 - examples/podcast_reviews/unified_job.py | 47 +++++++ 13 files changed, 215 insertions(+), 360 deletions(-) delete mode 100644 examples/basic_graph/GenerateConfigure.java delete mode 100644 examples/basic_graph/GenerateExecute.java delete mode 100644 examples/basic_graph/SumConfigure.java delete mode 100644 examples/basic_graph/SumExecute.java create mode 100644 examples/basic_graph/UnifiedSum.java delete mode 100755 examples/basic_job/configure.sh delete mode 100755 examples/basic_job/execute.sh delete mode 100644 examples/podcast_reviews/configure.py delete mode 100644 examples/podcast_reviews/execute.py create mode 100644 examples/podcast_reviews/unified_job.py diff --git a/databuild/rules.bzl b/databuild/rules.bzl index 50dc9bd..29526e2 100644 --- a/databuild/rules.bzl +++ b/databuild/rules.bzl @@ -30,56 +30,28 @@ source "${RUNFILES_DIR:-/dev/null}/$f" 2>/dev/null || \ def databuild_job( name, - configure = None, - execute = None, - binary = None, + binary, visibility = None): """Creates a DataBuild job target with configuration and execution capabilities. Args: name: Name of the job target - configure: Target that implements the configuration logic (optional if binary is provided) - execute: Target that implements the execution logic (optional if binary is provided) - binary: Single binary target that handles both config and exec via subcommands (optional) + binary: Single binary target that handles both config and exec via subcommands visibility: Visibility specification """ - # Validate arguments - if binary and (configure or execute): - fail("Cannot specify both 'binary' and 'configure'/'execute' parameters") - if not binary and not (configure and execute): - fail("Must specify either 'binary' or both 'configure' and 'execute' parameters") + # Single binary approach - use subcommands + _databuild_job_cfg_rule( + name = name + ".cfg", + configure = binary, + visibility = visibility, + ) - if binary: - # Single binary approach - use subcommands - _databuild_job_cfg_rule( - name = name + ".cfg", - configure = binary, - use_subcommand = True, - visibility = visibility, - ) - - _databuild_job_exec_rule( - name = name + ".exec", - execute = binary, - use_subcommand = True, - visibility = visibility, - ) - else: - # Legacy approach - separate binaries - _databuild_job_cfg_rule( - name = name + ".cfg", - configure = configure, - use_subcommand = False, - visibility = visibility, - ) - - _databuild_job_exec_rule( - name = name + ".exec", - execute = execute, - use_subcommand = False, - visibility = visibility, - ) + _databuild_job_exec_rule( + name = name + ".exec", + execute = binary, + visibility = visibility, + ) # Create a job target that configures then executes _databuild_job_rule( @@ -94,21 +66,13 @@ def _databuild_job_cfg_impl(ctx): configure_path = ctx.attr.configure.files_to_run.executable.path script = ctx.actions.declare_file(ctx.label.name) - # Handle subcommand for single binary approach - if ctx.attr.use_subcommand: - executable_cmd = configure_path + " config" - prefix = "EXECUTABLE_SUBCOMMAND=\"config\"\n" - else: - executable_cmd = configure_path - prefix = "" - ctx.actions.expand_template( template = ctx.file._template, output = script, substitutions = { "%{EXECUTABLE_PATH}": configure_path, "%{RUNFILES_PREFIX}": RUNFILES_PREFIX, - "%{PREFIX}": prefix, + "%{PREFIX}": "EXECUTABLE_SUBCOMMAND=\"config\"\n", }, is_executable = True, ) @@ -135,10 +99,6 @@ _databuild_job_cfg_rule = rule( cfg = "target", mandatory = True, ), - "use_subcommand": attr.bool( - doc = "Whether to append 'config' subcommand to executable", - default = False, - ), "_template": attr.label( default = "@databuild//databuild/runtime:simple_executable_wrapper.sh.tpl", allow_single_file = True, @@ -160,12 +120,6 @@ def _databuild_job_exec_impl(ctx): # Get the correct runfiles paths jq_path = ctx.attr._jq.files_to_run.executable.path execute_path = ctx.attr.execute.files_to_run.executable.path - - # Handle subcommand for single binary approach - if ctx.attr.use_subcommand: - prefix = "EXECUTE_SUBCOMMAND=\"exec\"\n" - else: - prefix = "" ctx.actions.expand_template( template = ctx.file._template, @@ -174,7 +128,7 @@ def _databuild_job_exec_impl(ctx): "%{JQ_PATH}": jq_path, "%{EXECUTE_PATH}": execute_path, "%{RUNFILES_PREFIX}": RUNFILES_PREFIX, - "%{PREFIX}": prefix, + "%{PREFIX}": "EXECUTE_SUBCOMMAND=\"exec\"\n", }, is_executable = True, ) @@ -211,10 +165,6 @@ _databuild_job_exec_rule = rule( executable = True, cfg = "target", ), - "use_subcommand": attr.bool( - doc = "Whether to append 'exec' subcommand to executable", - default = False, - ), "_template": attr.label( default = "@databuild//databuild/job:execute_wrapper.sh.tpl", allow_single_file = True, diff --git a/examples/basic_graph/BUILD.bazel b/examples/basic_graph/BUILD.bazel index 12a9ae0..46abb0d 100644 --- a/examples/basic_graph/BUILD.bazel +++ b/examples/basic_graph/BUILD.bazel @@ -35,67 +35,12 @@ py_binary( databuild_job( name = "generate_number_job", - configure = ":generate_number_configure", - execute = ":generate_number_execute", + binary = ":generate_number_binary", visibility = ["//visibility:public"], ) java_binary( - name = "generate_number_configure", - srcs = glob(["*.java"]), - create_executable = True, - main_class = "com.databuild.examples.basic_graph.GenerateConfigure", - deps = [ - "@maven//:com_fasterxml_jackson_core_jackson_annotations", - "@maven//:com_fasterxml_jackson_core_jackson_core", - "@maven//:com_fasterxml_jackson_core_jackson_databind", - "@maven//:com_fasterxml_jackson_module_jackson_module_jsonSchema", - ], -) - -java_binary( - name = "generate_number_execute", - srcs = glob(["GenerateExecute.java"]), - main_class = "com.databuild.examples.basic_graph.GenerateExecute", -) - -databuild_job( - name = "sum_job", - configure = ":sum_configure", - execute = ":sum_execute", - visibility = ["//visibility:public"], -) - -# New unified approach test -databuild_job( - name = "unified_generate_job", - binary = ":unified_generate_binary", - visibility = ["//visibility:public"], -) - -java_binary( - name = "sum_configure", - srcs = glob(["*.java"]), - main_class = "com.databuild.examples.basic_graph.SumConfigure", - deps = [ - "@maven//:com_fasterxml_jackson_core_jackson_annotations", - "@maven//:com_fasterxml_jackson_core_jackson_core", - "@maven//:com_fasterxml_jackson_core_jackson_databind", - "@maven//:com_fasterxml_jackson_module_jackson_module_jsonSchema", - ], -) - -java_binary( - name = "sum_execute", - srcs = glob([ - "SumExecute.java", - "GenerateExecute.java", - ]), - main_class = "com.databuild.examples.basic_graph.SumExecute", -) - -java_binary( - name = "unified_generate_binary", + name = "generate_number_binary", srcs = ["UnifiedGenerateNumber.java"], main_class = "com.databuild.examples.basic_graph.UnifiedGenerateNumber", deps = [ @@ -104,3 +49,20 @@ java_binary( "@maven//:com_fasterxml_jackson_core_jackson_databind", ], ) + +databuild_job( + name = "sum_job", + binary = ":sum_binary", + visibility = ["//visibility:public"], +) + +java_binary( + name = "sum_binary", + srcs = ["UnifiedSum.java"], + main_class = "com.databuild.examples.basic_graph.UnifiedSum", + deps = [ + "@maven//:com_fasterxml_jackson_core_jackson_annotations", + "@maven//:com_fasterxml_jackson_core_jackson_core", + "@maven//:com_fasterxml_jackson_core_jackson_databind", + ], +) \ No newline at end of file diff --git a/examples/basic_graph/GenerateConfigure.java b/examples/basic_graph/GenerateConfigure.java deleted file mode 100644 index f052dad..0000000 --- a/examples/basic_graph/GenerateConfigure.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.databuild.examples.basic_graph; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.JsonNode; - -import java.util.ArrayList; -import java.util.List; -import java.io.File; -import java.util.Arrays; -import java.util.Collections; - -/** - * Configure class for generating a random number. - * This class creates a job configuration for generating a random number based on the partition ref. - */ -public class GenerateConfigure { - public static void main(String[] args) { - if (args.length < 1) { - System.err.println("Error: Partition ref is required"); - System.exit(1); - } - List configList = new ArrayList<>(); - - // Process each partition ref from input arguments - Arrays.stream(args).forEach(partitionRef -> { - // Create and populate JobConfig object - JobConfig config = new JobConfig(); - config.outputs = Collections.singletonList(partitionRef); - config.args = Arrays.asList(partitionRef); - // inputs and env are already initialized as empty collections in the constructor - configList.add(config); - }); - - try { - ObjectMapper mapper = new ObjectMapper(); - - // Convert config list to JsonNode and serialize - JsonNode configNode = mapper.valueToTree(configList); - String jsonConfig = mapper.writeValueAsString(configNode); - System.out.println(jsonConfig); - } catch (Exception e) { - System.err.println("Error: Failed to validate or serialize config: " + e.getMessage()); - System.exit(1); - } - } -} \ No newline at end of file diff --git a/examples/basic_graph/GenerateExecute.java b/examples/basic_graph/GenerateExecute.java deleted file mode 100644 index 671d727..0000000 --- a/examples/basic_graph/GenerateExecute.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.databuild.examples.basic_graph; - -import java.io.FileWriter; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Random; - -/** - * Execute class for generating a random number. - * This class generates a random number based on the partition ref. - */ -public class GenerateExecute { - public static String BASE_PATH = "/tmp/databuild_test/examples/basic_graph/"; - - public static void main(String[] args) { - if (args.length < 1) { - System.err.println("Error: Partition ref (output path) is required"); - System.exit(1); - } - - String partitionRef = args[0]; - - try { - // Create a hash of the partition ref to use as a seed - MessageDigest md = MessageDigest.getInstance("MD5"); - byte[] hashBytes = md.digest(partitionRef.getBytes(StandardCharsets.UTF_8)); - - // Convert the first 8 bytes of the hash to a long to use as a seed - long seed = 0; - for (int i = 0; i < Math.min(8, hashBytes.length); i++) { - seed = (seed << 8) | (hashBytes[i] & 0xff); - } - - // Create a random number generator with the seed - Random random = new Random(seed); - - // Generate a random number - int randomNumber = random.nextInt(100); - - // Write the random number to the output file - // Ensure dir exists - java.io.File parent = new java.io.File(partitionRef).getParentFile(); - if (parent != null) { - parent.mkdirs(); - } - try (FileWriter writer = new FileWriter(partitionRef)) { - writer.write("Random number for partition " + partitionRef + ": " + randomNumber); - } - - System.out.println("Generated random number " + randomNumber + " for partition " + partitionRef); - - // Write the random number to the output file - String outputPath = partitionRef; - System.out.println("Writing random number " + randomNumber + " to " + outputPath); - // Ensure dir exists - new java.io.File(outputPath).getParentFile().mkdirs(); - // Write number (overwrite) - try (FileWriter writer = new FileWriter(outputPath)) { - writer.write(String.valueOf(randomNumber)); - } - - } catch (NoSuchAlgorithmException | IOException e) { - System.err.println("Error: " + e.getMessage()); - e.printStackTrace(); - System.exit(1); - } - } -} \ No newline at end of file diff --git a/examples/basic_graph/SumConfigure.java b/examples/basic_graph/SumConfigure.java deleted file mode 100644 index 197940a..0000000 --- a/examples/basic_graph/SumConfigure.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.databuild.examples.basic_graph; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.JsonNode; - -import java.io.File; -import java.util.Arrays; -import java.util.Collections; -import java.util.stream.Collectors; - -import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH; - -/** - * Configure class for generating a random number. - * This class creates a job configuration for generating a random number based on the partition ref. - */ -public class SumConfigure { - public static void main(String[] args) { - if (args.length != 1) { - System.err.println("Error: Must provide exactly one partition ref as an argument"); - System.exit(1); - } - - String partitionRef = args[0]; - String[] pathParts = partitionRef.split("/"); - String[] upstreams = Arrays.stream(pathParts[pathParts.length - 1].split("_")) - .map(part -> BASE_PATH + "generated_number/" + part) - .toArray(String[]::new); - - // Create and populate JobConfig object - JobConfig config = new JobConfig(); - config.outputs = Collections.singletonList(BASE_PATH + "sum/" +partitionRef); - config.inputs = Arrays.stream(upstreams) - .map(upstream -> { - DataDep dep = new DataDep(); - dep.depType = "materialize"; - dep.ref = upstream; - return dep; - }) - .collect(Collectors.toList()); - config.args = Arrays.asList(upstreams); - // Create a hashmap for env with {"OUTPUT_REF": "foo"} - config.env = Collections.singletonMap("OUTPUT_REF", args[0]); - // inputs and env are already initialized as empty collections in the constructor - - try { - ObjectMapper mapper = new ObjectMapper(); - - // Convert config to JsonNode and serialize - JsonNode configNode = mapper.valueToTree(Collections.singletonList(config)); - String jsonConfig = mapper.writeValueAsString(configNode); - System.out.println(jsonConfig); - } catch (Exception e) { - System.err.println("Error: Failed to validate or serialize config: " + e.getMessage()); - System.exit(1); - } - } -} diff --git a/examples/basic_graph/SumExecute.java b/examples/basic_graph/SumExecute.java deleted file mode 100644 index f45acae..0000000 --- a/examples/basic_graph/SumExecute.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.databuild.examples.basic_graph; - -import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH; - -public class SumExecute { - public static void main(String[] args) { - if (args.length < 1) { - System.err.println("Error: Partition ref (output path) is required"); - System.exit(1); - } - - // Get output ref from env var OUTPUT_REF - String outputRef = System.getenv("OUTPUT_REF"); - - // For each arg, load it from the file system and add it to the sum - int sum = 0; - for (String partitionRef : args) { - try { - String path = partitionRef; - int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path)))); - System.out.println("Summing partition " + partitionRef + " with value " + partitionValue); - sum += partitionValue; - } catch (Exception e) { - System.err.println("Error: Failed to read partition " + partitionRef + ": " + e.getMessage()); - e.printStackTrace(); - } - } - System.out.println("Sum of " + args.length + " partitions: " + sum); - // Write the sum to the output file - String outPath = outputRef; - System.out.println("Writing sum " + sum + " to " + outPath); - - java.io.File parent = new java.io.File(outPath).getParentFile(); - if (parent != null) { - parent.mkdirs(); - } - - try (java.io.FileWriter writer = new java.io.FileWriter(outPath)) { - writer.write(String.valueOf(sum)); - } catch (Exception e) { - System.err.println("Error: Failed to write sum to " + outputRef + ": " + e.getMessage()); - } - } -} diff --git a/examples/basic_graph/UnifiedSum.java b/examples/basic_graph/UnifiedSum.java new file mode 100644 index 0000000..b950edc --- /dev/null +++ b/examples/basic_graph/UnifiedSum.java @@ -0,0 +1,130 @@ +package com.databuild.examples.basic_graph; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.Collectors; +import java.io.FileWriter; +import java.io.IOException; + +// import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH; + +/** + * Unified sum job that handles both configuration and execution via subcommands. + */ +public class UnifiedSum { + public static String BASE_PATH = "/tmp/databuild_test/examples/basic_graph/"; + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Usage: UnifiedSum {config|exec} [args...]"); + System.exit(1); + } + + String command = args[0]; + switch (command) { + case "config": + handleConfig(Arrays.copyOfRange(args, 1, args.length)); + break; + case "exec": + handleExec(Arrays.copyOfRange(args, 1, args.length)); + break; + default: + System.err.println("Unknown command: " + command); + System.err.println("Usage: UnifiedSum {config|exec} [args...]"); + System.exit(1); + } + } + + private static void handleConfig(String[] args) { + if (args.length != 1) { + System.err.println("Config mode requires exactly one partition ref"); + System.exit(1); + } + + String partitionRef = args[0]; + String[] pathParts = partitionRef.split("/"); + String[] upstreams = Arrays.stream(pathParts[pathParts.length - 1].split("_")) + .map(part -> BASE_PATH + "generated_number/" + part) + .toArray(String[]::new); + + try { + ObjectMapper mapper = new ObjectMapper(); + + // Create data dependencies + var inputs = mapper.createArrayNode(); + for (String upstream : upstreams) { + var dataDep = mapper.createObjectNode(); + dataDep.put("dep_type", 0); // QUERY + var partRef = mapper.createObjectNode(); + partRef.put("str", upstream); + dataDep.set("partition_ref", partRef); + inputs.add(dataDep); + } + + // Create job configuration + var config = mapper.createObjectNode(); + config.set("outputs", mapper.createArrayNode().add(partitionRef)); + config.set("inputs", inputs); + var argsArray = mapper.createArrayNode(); + for (String upstream : upstreams) { + argsArray.add(upstream); + } + config.set("args", argsArray); + config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef)); + + var response = mapper.createObjectNode(); + response.set("configs", mapper.createArrayNode().add(config)); + + System.out.println(mapper.writeValueAsString(response)); + } catch (Exception e) { + System.err.println("Error creating config: " + e.getMessage()); + System.exit(1); + } + } + + private static void handleExec(String[] args) { + // Get output ref from env var OUTPUT_REF + String outputRef = System.getenv("OUTPUT_REF"); + + if (outputRef == null) { + System.err.println("Error: OUTPUT_REF environment variable is required"); + System.exit(1); + } + + // For each arg, load it from the file system and add it to the sum + int sum = 0; + for (String partitionRef : args) { + try { + String path = partitionRef; + int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path)))); + System.out.println("Summing partition " + partitionRef + " with value " + partitionValue); + sum += partitionValue; + } catch (Exception e) { + System.err.println("Error: Failed to read partition " + partitionRef + ": " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } + } + + System.out.println("Sum of " + args.length + " partitions: " + sum); + + // Write the sum to the output file + try { + File outputDir = new File(outputRef).getParentFile(); + if (outputDir != null) { + outputDir.mkdirs(); + } + try (FileWriter writer = new FileWriter(outputRef)) { + writer.write(String.valueOf(sum)); + } + System.out.println("Wrote sum " + sum + " to " + outputRef); + } catch (Exception e) { + System.err.println("Error writing output: " + e.getMessage()); + System.exit(1); + } + } +} \ No newline at end of file diff --git a/examples/basic_job/configure.sh b/examples/basic_job/configure.sh deleted file mode 100755 index d69fa5b..0000000 --- a/examples/basic_job/configure.sh +++ /dev/null @@ -1,2 +0,0 @@ -# Create a test job config -echo "{\"configs\":[{\"outputs\":[\"$1\"],\"inputs\":[],\"args\":[\"will\", \"build\", \"$1\"],\"env\":{\"foo\":\"bar\"}}]}" diff --git a/examples/basic_job/execute.sh b/examples/basic_job/execute.sh deleted file mode 100755 index 921637d..0000000 --- a/examples/basic_job/execute.sh +++ /dev/null @@ -1,3 +0,0 @@ -echo 'EXECUTE!' -echo "foo=$foo" -echo "args=$@" \ No newline at end of file diff --git a/examples/podcast_reviews/BUILD.bazel b/examples/podcast_reviews/BUILD.bazel index 6a7d69a..3d2674f 100644 --- a/examples/podcast_reviews/BUILD.bazel +++ b/examples/podcast_reviews/BUILD.bazel @@ -11,20 +11,13 @@ compile_pip_requirements( databuild_job( name = "test_job", - configure = ":test_job_configure", - execute = ":test_job_execute", + binary = ":test_job_binary", ) py_binary( - name = "test_job_configure", - srcs = ["configure.py"], - main = "configure.py", -) - -py_binary( - name = "test_job_execute", - srcs = ["execute.py"], - main = "execute.py", + name = "test_job_binary", + srcs = ["unified_job.py"], + main = "unified_job.py", ) py_repl( diff --git a/examples/podcast_reviews/configure.py b/examples/podcast_reviews/configure.py deleted file mode 100644 index 2ddce4b..0000000 --- a/examples/podcast_reviews/configure.py +++ /dev/null @@ -1,2 +0,0 @@ - -print("Hello, gorgeous.") diff --git a/examples/podcast_reviews/execute.py b/examples/podcast_reviews/execute.py deleted file mode 100644 index ef6ae51..0000000 --- a/examples/podcast_reviews/execute.py +++ /dev/null @@ -1,2 +0,0 @@ - -print("What a time to be alive.") diff --git a/examples/podcast_reviews/unified_job.py b/examples/podcast_reviews/unified_job.py new file mode 100644 index 0000000..db82e35 --- /dev/null +++ b/examples/podcast_reviews/unified_job.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import sys +import json + +def main(): + if len(sys.argv) < 2: + print("Usage: unified_job.py {config|exec} [args...]", file=sys.stderr) + sys.exit(1) + + command = sys.argv[1] + + if command == "config": + handle_config(sys.argv[2:]) + elif command == "exec": + handle_exec(sys.argv[2:]) + else: + print(f"Unknown command: {command}", file=sys.stderr) + print("Usage: unified_job.py {config|exec} [args...]", file=sys.stderr) + sys.exit(1) + +def handle_config(args): + if len(args) < 1: + print("Config mode requires partition ref", file=sys.stderr) + sys.exit(1) + + partition_ref = args[0] + + config = { + "configs": [{ + "outputs": [partition_ref], + "inputs": [], + "args": ["Hello", "gorgeous", partition_ref], + "env": {"PARTITION_REF": partition_ref} + }] + } + + print(json.dumps(config)) + +def handle_exec(args): + print("What a time to be alive.") + print(f"Partition ref: {os.getenv('PARTITION_REF', 'unknown')}") + print(f"Args: {args}") + +if __name__ == "__main__": + import os + main() \ No newline at end of file