databuild/examples/basic_graph/UnifiedSum.java
Stuart Axelbrooke 48a1288891
Some checks are pending
/ setup (push) Waiting to run
Clean up old paths
2025-06-29 20:14:33 -07:00

130 lines
No EOL
4.8 KiB
Java

package com.databuild.examples.basic_graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;
import java.io.FileWriter;
import java.io.IOException;
// import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH;
/**
* Unified sum job that handles both configuration and execution via subcommands.
*/
public class UnifiedSum {
public static String BASE_PATH = "/tmp/databuild_test/examples/basic_graph/";
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: UnifiedSum {config|exec} [args...]");
System.exit(1);
}
String command = args[0];
switch (command) {
case "config":
handleConfig(Arrays.copyOfRange(args, 1, args.length));
break;
case "exec":
handleExec(Arrays.copyOfRange(args, 1, args.length));
break;
default:
System.err.println("Unknown command: " + command);
System.err.println("Usage: UnifiedSum {config|exec} [args...]");
System.exit(1);
}
}
private static void handleConfig(String[] args) {
if (args.length != 1) {
System.err.println("Config mode requires exactly one partition ref");
System.exit(1);
}
String partitionRef = args[0];
String[] pathParts = partitionRef.split("/");
String[] upstreams = Arrays.stream(pathParts[pathParts.length - 1].split("_"))
.map(part -> BASE_PATH + "generated_number/" + part)
.toArray(String[]::new);
try {
ObjectMapper mapper = new ObjectMapper();
// Create data dependencies
var inputs = mapper.createArrayNode();
for (String upstream : upstreams) {
var dataDep = mapper.createObjectNode();
dataDep.put("dep_type", 0); // QUERY
var partRef = mapper.createObjectNode();
partRef.put("str", upstream);
dataDep.set("partition_ref", partRef);
inputs.add(dataDep);
}
// Create job configuration
var config = mapper.createObjectNode();
config.set("outputs", mapper.createArrayNode().add(partitionRef));
config.set("inputs", inputs);
var argsArray = mapper.createArrayNode();
for (String upstream : upstreams) {
argsArray.add(upstream);
}
config.set("args", argsArray);
config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef));
var response = mapper.createObjectNode();
response.set("configs", mapper.createArrayNode().add(config));
System.out.println(mapper.writeValueAsString(response));
} catch (Exception e) {
System.err.println("Error creating config: " + e.getMessage());
System.exit(1);
}
}
private static void handleExec(String[] args) {
// Get output ref from env var OUTPUT_REF
String outputRef = System.getenv("OUTPUT_REF");
if (outputRef == null) {
System.err.println("Error: OUTPUT_REF environment variable is required");
System.exit(1);
}
// For each arg, load it from the file system and add it to the sum
int sum = 0;
for (String partitionRef : args) {
try {
String path = partitionRef;
int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path))));
System.out.println("Summing partition " + partitionRef + " with value " + partitionValue);
sum += partitionValue;
} catch (Exception e) {
System.err.println("Error: Failed to read partition " + partitionRef + ": " + e.getMessage());
e.printStackTrace();
System.exit(1);
}
}
System.out.println("Sum of " + args.length + " partitions: " + sum);
// Write the sum to the output file
try {
File outputDir = new File(outputRef).getParentFile();
if (outputDir != null) {
outputDir.mkdirs();
}
try (FileWriter writer = new FileWriter(outputRef)) {
writer.write(String.valueOf(sum));
}
System.out.println("Wrote sum " + sum + " to " + outputRef);
} catch (Exception e) {
System.err.println("Error writing output: " + e.getMessage());
System.exit(1);
}
}
}