import os import sys from pydantic import TypeAdapter from pydantic.dataclasses import dataclass from typing_extensions import Literal import subprocess MODE = os.environ["DATABUILD_MODE"] LOOKUP_PATH = os.environ["DATABUILD_JOB_LOOKUP_PATH"] CANDIDATE_JOBS = os.environ["DATABUILD_CANDIDATE_JOBS"].split(",") class ImmutableConfig: frozen = True @dataclass(config=ImmutableConfig) class DataDep: depType: Literal["query", "materialize"] ref: str @dataclass(config=ImmutableConfig) class JobConfig: inputs: list[DataDep] outputs: list[str] args: list[str] env: dict[str, str] def __hash__(self): return hash(( tuple(self.inputs), tuple(self.outputs), tuple(self.args), tuple(sorted(self.env.items())) )) @dataclass(config=ImmutableConfig) class Task: jobLabel: str config: JobConfig @dataclass(config=ImmutableConfig) class JobGraph: outputs: list[str] nodes: list[Task] @dataclass(config=ImmutableConfig) class PartitionManifest: outputs: str inputs: list[DataDep] startTime: int endTime: int # TODO should be a runnable? config: JobConfig def job_label_to_cfg_path(job_label: str) -> str: return job_label.replace(":", "/") + ".cfg" def configure(job_label: str, output_refs: list[str]) -> list[Task]: """ Configures the specified job to produce the desired outputs, returning the list of tasks that will achieve this. """ assert job_label in CANDIDATE_JOBS, f"Job `{job_label}` is not a candidate job" proc = subprocess.run( [job_label_to_cfg_path(job_label), *output_refs], capture_output=True, text=True, ) if proc.returncode != 0: raise RuntimeError(f"Failed to run job config: {proc.stderr}") job_configs = TypeAdapter(list[JobConfig]).validate_json(proc.stdout) return [Task(jobLabel = job_label, config = cfg) for cfg in job_configs] def resolve(output_refs: set[str]) -> dict[str, list[str]]: """ # Produces a mapping of required job refs to the partitions it produces """ proc = subprocess.run( [LOOKUP_PATH, *output_refs], capture_output=True, text=True, ) if proc.returncode != 0: raise RuntimeError(f"Failed to run job lookup: {proc.stderr}") return TypeAdapter(dict[str, list[str]]).validate_json(proc.stdout) def plan(output_refs: list[str]) -> JobGraph: unhandled_refs = set(output_refs) epoch = 0 nodes = set() while unhandled_refs: if epoch >= 1000: raise RuntimeError(f"Still planning after {epoch} epochs, giving up") # Resolve jobs for all new_nodes = set() for job_label, produced_refs in resolve(output_refs=unhandled_refs).items(): # TODO do we need to try and merge jobs later? E.g. discovering a partition ref in a later epoch that can # be produced by an earlier resolved job config new_nodes += configure(job_label=job_label, output_refs=produced_refs) unhandled_refs -= set(produced_refs) assert not unhandled_refs, f"Should have unhandled refs after configuration phase, but had: {unhandled_refs}" epoch += 1 # Plan next epoch unhandled_refs = set( input_dep.ref for task in new_nodes for input_dep in task.config.inputs if input_dep.depType == "materialize" ) if nodes: return JobGraph( outputs=output_refs, nodes=list(nodes), ) else: raise RuntimeError("Unknown failure in graph planning") def main(): if MODE == "plan": print(TypeAdapter(JobGraph).dump_json(plan(sys.argv[1:]))) elif MODE == "lookup": print(TypeAdapter(dict[str, list[str]]).dump_json(resolve(sys.argv[1:]))) elif MODE == "import_test": print("ok :)") else: raise RuntimeError(f"Unknown MODE `{MODE}`") if __name__ == '__main__': main()