124 lines
No EOL
3.9 KiB
Python
124 lines
No EOL
3.9 KiB
Python
"""
|
|
Simple Python DSL example - basic data processing pipeline
|
|
"""
|
|
|
|
from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder, PartitionPattern
|
|
from databuild.proto import JobConfig
|
|
from datetime import date
|
|
import os
|
|
|
|
# Define the graph
|
|
graph = DataBuildGraph("//:simple_graph")
|
|
|
|
|
|
class RawDataPartition(PartitionPattern):
|
|
"""Represents raw input data for a specific date"""
|
|
_raw_pattern = r"raw_data/date=(?P<date>\d{4}-\d{2}-\d{2})"
|
|
|
|
def __init__(self, date: str):
|
|
self.date = date
|
|
|
|
def serialize(self) -> str:
|
|
return f"raw_data/date={self.date}"
|
|
|
|
|
|
class ProcessedDataPartition(PartitionPattern):
|
|
"""Represents processed data for a specific date"""
|
|
_raw_pattern = r"processed_data/date=(?P<date>\d{4}-\d{2}-\d{2})"
|
|
|
|
def __init__(self, date: str):
|
|
self.date = date
|
|
|
|
def serialize(self) -> str:
|
|
return f"processed_data/date={self.date}"
|
|
|
|
|
|
class SummaryPartition(PartitionPattern):
|
|
"""Represents summary data for a specific date"""
|
|
_raw_pattern = r"summary/date=(?P<date>\d{4}-\d{2}-\d{2})"
|
|
|
|
def __init__(self, date: str):
|
|
self.date = date
|
|
|
|
def serialize(self) -> str:
|
|
return f"summary/date={self.date}"
|
|
|
|
|
|
@graph.job
|
|
class IngestRawData(DataBuildJob):
|
|
"""Job to ingest raw data for a given date"""
|
|
output_types = [RawDataPartition]
|
|
|
|
def config(self, outputs: list[RawDataPartition]) -> list[JobConfig]:
|
|
configs = []
|
|
for output in outputs:
|
|
env = {"DATA_DATE": output.date}
|
|
configs.append(
|
|
JobConfigBuilder()
|
|
.add_outputs(output)
|
|
.set_env(env)
|
|
.add_args("--date", output.date)
|
|
.build()
|
|
)
|
|
return configs
|
|
|
|
def exec(self, *args: str) -> None:
|
|
# Simple implementation - just create a dummy file
|
|
data_date = os.environ["DATA_DATE"]
|
|
print(f"Ingesting raw data for {data_date}")
|
|
# In a real job, this would read from external sources
|
|
print(f"Raw data ingested successfully for {data_date}")
|
|
|
|
|
|
@graph.job
|
|
class ProcessData(DataBuildJob):
|
|
"""Job to process raw data into processed format"""
|
|
output_types = [ProcessedDataPartition]
|
|
|
|
def config(self, outputs: list[ProcessedDataPartition]) -> list[JobConfig]:
|
|
configs = []
|
|
for output in outputs:
|
|
raw_input = RawDataPartition(date=output.date)
|
|
env = {"DATA_DATE": output.date}
|
|
configs.append(
|
|
JobConfigBuilder()
|
|
.add_outputs(output)
|
|
.add_inputs(raw_input)
|
|
.set_env(env)
|
|
.add_args("--date", output.date)
|
|
.build()
|
|
)
|
|
return configs
|
|
|
|
def exec(self, *args: str) -> None:
|
|
data_date = os.environ["DATA_DATE"]
|
|
print(f"Processing data for {data_date}")
|
|
# In a real job, this would transform the raw data
|
|
print(f"Data processed successfully for {data_date}")
|
|
|
|
|
|
@graph.job
|
|
class CreateSummary(DataBuildJob):
|
|
"""Job to create summary from processed data"""
|
|
output_types = [SummaryPartition]
|
|
|
|
def config(self, outputs: list[SummaryPartition]) -> list[JobConfig]:
|
|
configs = []
|
|
for output in outputs:
|
|
processed_input = ProcessedDataPartition(date=output.date)
|
|
env = {"DATA_DATE": output.date}
|
|
configs.append(
|
|
JobConfigBuilder()
|
|
.add_outputs(output)
|
|
.add_inputs(processed_input)
|
|
.set_env(env)
|
|
.add_args("--date", output.date)
|
|
.build()
|
|
)
|
|
return configs
|
|
|
|
def exec(self, *args: str) -> None:
|
|
data_date = os.environ["DATA_DATE"]
|
|
print(f"Creating summary for {data_date}")
|
|
# In a real job, this would generate summary statistics
|
|
print(f"Summary created successfully for {data_date}") |