""" Simple Python DSL example - basic data processing pipeline """ from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder, PartitionPattern from databuild.proto import JobConfig from datetime import date import os # Define the graph graph = DataBuildGraph("//:simple_graph") class RawDataPartition(PartitionPattern): """Represents raw input data for a specific date""" _raw_pattern = r"raw_data/date=(?P\d{4}-\d{2}-\d{2})" def __init__(self, date: str): self.date = date def serialize(self) -> str: return f"raw_data/date={self.date}" class ProcessedDataPartition(PartitionPattern): """Represents processed data for a specific date""" _raw_pattern = r"processed_data/date=(?P\d{4}-\d{2}-\d{2})" def __init__(self, date: str): self.date = date def serialize(self) -> str: return f"processed_data/date={self.date}" class SummaryPartition(PartitionPattern): """Represents summary data for a specific date""" _raw_pattern = r"summary/date=(?P\d{4}-\d{2}-\d{2})" def __init__(self, date: str): self.date = date def serialize(self) -> str: return f"summary/date={self.date}" @graph.job class IngestRawData(DataBuildJob): """Job to ingest raw data for a given date""" output_types = [RawDataPartition] def config(self, outputs: list[RawDataPartition]) -> list[JobConfig]: configs = [] for output in outputs: env = {"DATA_DATE": output.date} configs.append( JobConfigBuilder() .add_outputs(output) .set_env(env) .add_args("--date", output.date) .build() ) return configs def exec(self, *args: str) -> None: # Simple implementation - just create a dummy file data_date = os.environ["DATA_DATE"] print(f"Ingesting raw data for {data_date}") # In a real job, this would read from external sources print(f"Raw data ingested successfully for {data_date}") @graph.job class ProcessData(DataBuildJob): """Job to process raw data into processed format""" output_types = [ProcessedDataPartition] def config(self, outputs: list[ProcessedDataPartition]) -> list[JobConfig]: configs = [] for output in outputs: raw_input = RawDataPartition(date=output.date) env = {"DATA_DATE": output.date} configs.append( JobConfigBuilder() .add_outputs(output) .add_inputs(raw_input) .set_env(env) .add_args("--date", output.date) .build() ) return configs def exec(self, *args: str) -> None: data_date = os.environ["DATA_DATE"] print(f"Processing data for {data_date}") # In a real job, this would transform the raw data print(f"Data processed successfully for {data_date}") @graph.job class CreateSummary(DataBuildJob): """Job to create summary from processed data""" output_types = [SummaryPartition] def config(self, outputs: list[SummaryPartition]) -> list[JobConfig]: configs = [] for output in outputs: processed_input = ProcessedDataPartition(date=output.date) env = {"DATA_DATE": output.date} configs.append( JobConfigBuilder() .add_outputs(output) .add_inputs(processed_input) .set_env(env) .add_args("--date", output.date) .build() ) return configs def exec(self, *args: str) -> None: data_date = os.environ["DATA_DATE"] print(f"Creating summary for {data_date}") # In a real job, this would generate summary statistics print(f"Summary created successfully for {data_date}")