137 lines
No EOL
5.1 KiB
Java
137 lines
No EOL
5.1 KiB
Java
package com.databuild.examples.basic_graph;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
|
import java.io.File;
|
|
import java.util.Arrays;
|
|
import java.util.Collections;
|
|
import java.util.stream.Collectors;
|
|
import java.io.FileWriter;
|
|
import java.io.IOException;
|
|
|
|
// import static com.databuild.examples.basic_graph.GenerateExecute.BASE_PATH;
|
|
|
|
/**
|
|
* Unified sum job that handles both configuration and execution via subcommands.
|
|
*/
|
|
public class UnifiedSum {
|
|
public static String BASE_PATH = "/tmp/databuild_test/examples/basic_graph/";
|
|
|
|
public static void main(String[] args) {
|
|
if (args.length < 1) {
|
|
System.err.println("Usage: UnifiedSum {config|exec} [args...]");
|
|
System.exit(1);
|
|
}
|
|
|
|
String command = args[0];
|
|
switch (command) {
|
|
case "config":
|
|
handleConfig(Arrays.copyOfRange(args, 1, args.length));
|
|
break;
|
|
case "exec":
|
|
handleExec(Arrays.copyOfRange(args, 1, args.length));
|
|
break;
|
|
default:
|
|
System.err.println("Unknown command: " + command);
|
|
System.err.println("Usage: UnifiedSum {config|exec} [args...]");
|
|
System.exit(1);
|
|
}
|
|
}
|
|
|
|
private static void handleConfig(String[] args) {
|
|
if (args.length != 1) {
|
|
System.err.println("Config mode requires exactly one partition ref");
|
|
System.exit(1);
|
|
}
|
|
|
|
String partitionRef = args[0];
|
|
String[] pathParts = partitionRef.split("/");
|
|
String[] upstreams = Arrays.stream(pathParts[pathParts.length - 1].split("_"))
|
|
.map(part -> BASE_PATH + "generated_number/" + part)
|
|
.toArray(String[]::new);
|
|
|
|
try {
|
|
ObjectMapper mapper = new ObjectMapper();
|
|
|
|
// Create data dependencies
|
|
var inputs = mapper.createArrayNode();
|
|
for (String upstream : upstreams) {
|
|
var dataDep = mapper.createObjectNode();
|
|
dataDep.put("dep_type", 0); // QUERY
|
|
var partRef = mapper.createObjectNode();
|
|
partRef.put("str", upstream);
|
|
dataDep.set("partition_ref", partRef);
|
|
inputs.add(dataDep);
|
|
}
|
|
|
|
// Create job configuration
|
|
var config = mapper.createObjectNode();
|
|
|
|
// Create outputs as PartitionRef objects
|
|
var outputs = mapper.createArrayNode();
|
|
var outputPartRef = mapper.createObjectNode();
|
|
outputPartRef.put("str", partitionRef);
|
|
outputs.add(outputPartRef);
|
|
config.set("outputs", outputs);
|
|
|
|
config.set("inputs", inputs);
|
|
var argsArray = mapper.createArrayNode();
|
|
for (String upstream : upstreams) {
|
|
argsArray.add(upstream);
|
|
}
|
|
config.set("args", argsArray);
|
|
config.set("env", mapper.createObjectNode().put("OUTPUT_REF", partitionRef));
|
|
|
|
var response = mapper.createObjectNode();
|
|
response.set("configs", mapper.createArrayNode().add(config));
|
|
|
|
System.out.println(mapper.writeValueAsString(response));
|
|
} catch (Exception e) {
|
|
System.err.println("Error creating config: " + e.getMessage());
|
|
System.exit(1);
|
|
}
|
|
}
|
|
|
|
private static void handleExec(String[] args) {
|
|
// Get output ref from env var OUTPUT_REF
|
|
String outputRef = System.getenv("OUTPUT_REF");
|
|
|
|
if (outputRef == null) {
|
|
System.err.println("Error: OUTPUT_REF environment variable is required");
|
|
System.exit(1);
|
|
}
|
|
|
|
// For each arg, load it from the file system and add it to the sum
|
|
int sum = 0;
|
|
for (String partitionRef : args) {
|
|
try {
|
|
String path = partitionRef;
|
|
int partitionValue = Integer.parseInt(new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(path))));
|
|
System.out.println("Summing partition " + partitionRef + " with value " + partitionValue);
|
|
sum += partitionValue;
|
|
} catch (Exception e) {
|
|
System.err.println("Error: Failed to read partition " + partitionRef + ": " + e.getMessage());
|
|
e.printStackTrace();
|
|
System.exit(1);
|
|
}
|
|
}
|
|
|
|
System.out.println("Sum of " + args.length + " partitions: " + sum);
|
|
|
|
// Write the sum to the output file
|
|
try {
|
|
File outputDir = new File(outputRef).getParentFile();
|
|
if (outputDir != null) {
|
|
outputDir.mkdirs();
|
|
}
|
|
try (FileWriter writer = new FileWriter(outputRef)) {
|
|
writer.write(String.valueOf(sum));
|
|
}
|
|
System.out.println("Wrote sum " + sum + " to " + outputRef);
|
|
} catch (Exception e) {
|
|
System.err.println("Error writing output: " + e.getMessage());
|
|
System.exit(1);
|
|
}
|
|
}
|
|
} |