package com.databuild.examples.basic_graph; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.util.Arrays; import java.util.Collections; import java.util.stream.Collectors; import java.io.FileWriter; import java.io.IOException; // import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH; /** * Unified sum job that handles both configuration and execution via subcommands. */ public class UnifiedSum { public static String BASE_PATH = "/tmp/databuild_test/examples/basic_graph/"; public static void main(String[] args) { if (args.length < 1) { System.err.println("Usage: UnifiedSum {config|exec} [args...]"); System.exit(1); } String command = args[0]; switch (command) { case "config": handleConfig(Arrays.copyOfRange(args, 1, args.length)); break; case "exec": handleExec(Arrays.copyOfRange(args, 1, args.length)); break; default: System.err.println("Unknown command: " + command); System.err.println("Usage: UnifiedSum {config|exec} [args...]"); System.exit(1); } } private static void handleConfig(String[] args) { if (args.length != 1) { System.err.println("Config mode requires exactly one partition ref"); System.exit(1); } String partitionRef = args[0]; String[] pathParts = partitionRef.split("/"); String[] upstreams = Arrays.stream(pathParts[pathParts.length - 1].split("_")) .map(part -> BASE_PATH + "generated_number/" + part) .toArray(String[]::new); try { ObjectMapper mapper = new ObjectMapper(); // Create data dependencies var inputs = mapper.createArrayNode(); for (String upstream : upstreams) { var dataDep = mapper.createObjectNode(); dataDep.put("dep_type", 0); // QUERY var partRef = mapper.createObjectNode(); partRef.put("str", upstream); dataDep.set("partition_ref", partRef); inputs.add(dataDep); } // Create job configuration var config = mapper.createObjectNode(); // Create outputs as PartitionRef objects var outputs = mapper.createArrayNode(); var outputPartRef = mapper.createObjectNode(); outputPartRef.put("str", partitionRef); outputs.add(outputPartRef); config.set("outputs", outputs); config.set("inputs", inputs); var argsArray = mapper.createArrayNode(); for (String upstream : upstreams) { argsArray.add(upstream); } config.set("args", argsArray); config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef)); var response = mapper.createObjectNode(); response.set("configs", mapper.createArrayNode().add(config)); System.out.println(mapper.writeValueAsString(response)); } catch (Exception e) { System.err.println("Error creating config: " + e.getMessage()); System.exit(1); } } private static void handleExec(String[] args) { // Get output ref from env var OUTPUT_REF String outputRef = System.getenv("OUTPUT_REF"); if (outputRef == null) { System.err.println("Error: OUTPUT_REF environment variable is required"); System.exit(1); } // For each arg, load it from the file system and add it to the sum int sum = 0; for (String partitionRef : args) { try { String path = partitionRef; int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path)))); System.out.println("Summing partition " + partitionRef + " with value " + partitionValue); sum += partitionValue; } catch (Exception e) { System.err.println("Error: Failed to read partition " + partitionRef + ": " + e.getMessage()); e.printStackTrace(); System.exit(1); } } System.out.println("Sum of " + args.length + " partitions: " + sum); // Write the sum to the output file try { File outputDir = new File(outputRef).getParentFile(); if (outputDir != null) { outputDir.mkdirs(); } try (FileWriter writer = new FileWriter(outputRef)) { writer.write(String.valueOf(sum)); } System.out.println("Wrote sum " + sum + " to " + outputRef); } catch (Exception e) { System.err.println("Error writing output: " + e.getMessage()); System.exit(1); } } }