databuild/graph/analyze.py
2025-04-18 19:38:05 -07:00

144 lines
No EOL
3.9 KiB
Python

import os
import sys
from pydantic import TypeAdapter
from pydantic.dataclasses import dataclass
from typing_extensions import Literal
import subprocess
MODE = os.environ["DATABUILD_MODE"]
LOOKUP_PATH = os.environ["DATABUILD_JOB_LOOKUP_PATH"]
CANDIDATE_JOBS = os.environ["DATABUILD_CANDIDATE_JOBS"].split(",")
class ImmutableConfig:
frozen = True
@dataclass(config=ImmutableConfig)
class DataDep:
depType: Literal["query", "materialize"]
ref: str
@dataclass(config=ImmutableConfig)
class JobConfig:
inputs: list[DataDep]
outputs: list[str]
args: list[str]
env: dict[str, str]
def __hash__(self):
return hash((
tuple(self.inputs),
tuple(self.outputs),
tuple(self.args),
tuple(sorted(self.env.items()))
))
@dataclass(config=ImmutableConfig)
class Task:
jobLabel: str
config: JobConfig
@dataclass(config=ImmutableConfig)
class JobGraph:
outputs: list[str]
nodes: list[Task]
@dataclass(config=ImmutableConfig)
class PartitionManifest:
outputs: str
inputs: list[DataDep]
startTime: int
endTime: int
# TODO should be a runnable?
config: JobConfig
def job_label_to_cfg_path(job_label: str) -> str:
return job_label.replace(":", "/") + ".cfg"
def configure(job_label: str, output_refs: list[str]) -> list[Task]:
"""
Configures the specified job to produce the desired outputs, returning the list of tasks that will achieve this.
"""
assert job_label in CANDIDATE_JOBS, f"Job `{job_label}` is not a candidate job"
proc = subprocess.run(
[job_label_to_cfg_path(job_label), *output_refs],
capture_output=True,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run job config: {proc.stderr}")
job_configs = TypeAdapter(list[JobConfig]).validate_json(proc.stdout)
return [Task(jobLabel = job_label, config = cfg) for cfg in job_configs]
def resolve(output_refs: set[str]) -> dict[str, list[str]]:
"""
# Produces a mapping of required job refs to the partitions it produces
"""
proc = subprocess.run(
[LOOKUP_PATH, *output_refs],
capture_output=True,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run job lookup: {proc.stderr}")
return TypeAdapter(dict[str, list[str]]).validate_json(proc.stdout)
def plan(output_refs: list[str]) -> JobGraph:
unhandled_refs = set(output_refs)
epoch = 0
nodes = set()
while unhandled_refs:
if epoch >= 1000:
raise RuntimeError(f"Still planning after {epoch} epochs, giving up")
# Resolve jobs for all
new_nodes = set()
for job_label, produced_refs in resolve(output_refs=unhandled_refs).items():
# TODO do we need to try and merge jobs later? E.g. discovering a partition ref in a later epoch that can
# be produced by an earlier resolved job config
new_nodes += configure(job_label=job_label, output_refs=produced_refs)
unhandled_refs -= set(produced_refs)
assert not unhandled_refs, f"Should have unhandled refs after configuration phase, but had: {unhandled_refs}"
epoch += 1
# Plan next epoch
unhandled_refs = set(
input_dep.ref
for task in new_nodes
for input_dep in task.config.inputs
if input_dep.depType == "materialize"
)
if nodes:
return JobGraph(
outputs=output_refs,
nodes=list(nodes),
)
else:
raise RuntimeError("Unknown failure in graph planning")
def main():
if MODE == "plan":
print(TypeAdapter(JobGraph).dump_json(plan(sys.argv[1:])))
elif MODE == "lookup":
print(TypeAdapter(dict[str, list[str]]).dump_json(resolve(sys.argv[1:])))
elif MODE == "import_test":
print("ok :)")
else:
raise RuntimeError(f"Unknown MODE `{MODE}`")
if __name__ == '__main__':
main()