databuild/databuild/dsl/python/test/dsl_test.py
Stuart Axelbrooke 6f2408a3ee
Some checks are pending
/ setup (push) Waiting to run
lay groundwork for python dsl
2025-07-30 07:01:46 -07:00

73 lines
2.5 KiB
Python

from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob, JobConfig, PartitionManifest
from dataclasses import dataclass
import pytest
@dataclass
class DateCategory:
data_date: str
category: str
class CategoryAnalysisPartition(DateCategory, PartitionPattern):
_raw_pattern = r"category_analysis/category=(?P<category>[^/]+)/date=(?P<data_date>\d{4}-\d{2}-\d{2})"
def test_basic_partition_pattern():
p1 = CategoryAnalysisPartition(data_date="2025-01-01", category="comedy")
assert p1.serialize() == "category_analysis/category=comedy/date=2025-01-01"
p2 = CategoryAnalysisPartition.deserialize("category_analysis/category=technology/date=2025-01-02")
assert p2.data_date == "2025-01-02"
assert p2.category == "technology"
class NotEnoughFieldsPartition(DateCategory, PartitionPattern):
# Doesn't use the partition fields
_raw_pattern = r"invalid_partition_pattern"
class TooManyFieldsPartition(DateCategory, PartitionPattern):
# Doesn't use the partition fields
_raw_pattern = r"category_analysis/category=(?P<category>[^/]+)/date=(?P<data_date>\d{4}-\d{2}-\d{2})/hour=(?P<hour>\d{2})"
def test_invalid_partition_pattern():
with pytest.raises(ValueError):
NotEnoughFieldsPartition(data_date="2025-01-01", category="comedy")._validate_pattern()
with pytest.raises(ValueError):
TooManyFieldsPartition(data_date="2025-01-01", category="comedy")._validate_pattern()
def test_basic_graph_definition():
graph = DataBuildGraph("//:test_graph")
@graph.job
class TestJob(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
assert len(graph.lookup) == 1
assert CategoryAnalysisPartition in graph.lookup
def test_graph_collision():
graph = DataBuildGraph("//:test_graph")
@graph.job
class TestJob1(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
with pytest.raises(AssertionError):
@graph.job
class TestJob2(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__]))