17 KiB
17 KiB
Partition Activity Log Access Layer Design
Overview
This document details the implementation of a swappable datastore access layer for the Partition Activity Log, enabling support for SQLite, PostgreSQL, and Delta Lake backends while maintaining consistent semantics and performance characteristics.
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ DataBuild Core │
├─────────────────────────────────────────────────────────────┤
│ Activity Log Service │
├─────────────────────────────────────────────────────────────┤
│ Abstract Storage Interface │
├─────────────────────────────────────────────────────────────┤
│ SQLite Impl │ PostgreSQL Impl │ Delta Lake Impl │
└─────────────────────────────────────────────────────────────┘
1. Abstract Storage Interface
Core Trait Definition
pub trait PartitionActivityStore: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
// Event operations
async fn append_event(&self, event: &PartitionEvent) -> Result<(), Self::Error>;
async fn get_latest_event(&self, partition_ref: &PartitionRef) -> Result<Option<PartitionEvent>, Self::Error>;
async fn get_events_since(&self, partition_ref: &PartitionRef, since: Timestamp) -> Result<Vec<PartitionEvent>, Self::Error>;
// Batch operations for performance
async fn append_events(&self, events: &[PartitionEvent]) -> Result<(), Self::Error>;
async fn get_latest_events(&self, partition_refs: &[PartitionRef]) -> Result<Vec<Option<PartitionEvent>>, Self::Error>;
// Query operations
async fn query_events(&self, query: &PartitionEventQuery) -> Result<Vec<PartitionEvent>, Self::Error>;
async fn count_events(&self, query: &PartitionEventQuery) -> Result<u64, Self::Error>;
// Coordination queries
async fn get_active_builds(&self) -> Result<Vec<ActiveBuild>, Self::Error>;
async fn get_dependency_graph(&self, partition_ref: &PartitionRef, depth: u32) -> Result<DependencyGraph, Self::Error>;
// Streaming for real-time updates
async fn stream_events(&self, filter: &EventStreamFilter) -> Result<Box<dyn Stream<Item = PartitionEvent>>, Self::Error>;
// Maintenance operations
async fn vacuum(&self, before: Timestamp) -> Result<u64, Self::Error>;
async fn get_statistics(&self) -> Result<StoreStatistics, Self::Error>;
}
Query Types
#[derive(Debug, Clone)]
pub struct PartitionEventQuery {
pub partition_refs: Option<Vec<PartitionRef>>,
pub statuses: Option<Vec<PartitionStatus>>,
pub job_graph_run_ids: Option<Vec<String>>,
pub time_range: Option<(Timestamp, Timestamp)>,
pub limit: Option<u32>,
pub offset: Option<u32>,
pub order_by: Option<OrderBy>,
}
#[derive(Debug, Clone)]
pub struct EventStreamFilter {
pub partition_refs: Option<Vec<PartitionRef>>,
pub statuses: Option<Vec<PartitionStatus>>,
pub since: Option<Timestamp>,
}
#[derive(Debug, Clone)]
pub struct ActiveBuild {
pub partition_ref: PartitionRef,
pub job_graph_run_id: String,
pub status: PartitionStatus,
pub requesting_clients: Vec<String>,
pub started_at: Timestamp,
}
#[derive(Debug, Clone)]
pub struct DependencyGraph {
pub root: PartitionRef,
pub nodes: Vec<DependencyNode>,
pub edges: Vec<DependencyEdge>,
}
2. Storage Implementation Strategies
A. SQLite Implementation
Use Case: Single-node development, embedded deployments, testing Schema:
CREATE TABLE partition_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
partition_event_id TEXT NOT NULL UNIQUE,
partition_ref TEXT NOT NULL,
status INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
job_graph_run_id TEXT,
job_run_id TEXT,
producing_job_label TEXT,
requesting_clients TEXT, -- JSON array
delegated_to_run_id TEXT,
upstream_deps TEXT, -- JSON array
downstream_deps TEXT, -- JSON array
manifest_data BLOB, -- Protobuf serialized
failure_reason TEXT,
-- Indexes for performance
INDEX idx_partition_ref (partition_ref),
INDEX idx_timestamp (timestamp),
INDEX idx_job_graph_run_id (job_graph_run_id),
INDEX idx_status (status),
INDEX idx_partition_timestamp (partition_ref, timestamp DESC)
);
-- Materialized view for latest events
CREATE TABLE partition_latest_events (
partition_ref TEXT PRIMARY KEY,
latest_event_id INTEGER NOT NULL,
status INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (latest_event_id) REFERENCES partition_events(id)
);
Implementation Highlights:
- WAL mode for better concurrency
- Prepared statements for performance
- Periodic VACUUM for maintenance
- JSON functions for array queries
B. PostgreSQL Implementation
Use Case: Production deployments, high concurrency, complex queries Schema:
CREATE TABLE partition_events (
id BIGSERIAL PRIMARY KEY,
partition_event_id UUID NOT NULL UNIQUE,
partition_ref TEXT NOT NULL,
status INTEGER NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
job_graph_run_id TEXT,
job_run_id TEXT,
producing_job_label TEXT,
requesting_clients JSONB,
delegated_to_run_id TEXT,
upstream_deps JSONB,
downstream_deps JSONB,
manifest_data BYTEA,
failure_reason TEXT,
-- Optimized indexes
INDEX idx_partition_ref_timestamp ON partition_events USING BTREE (partition_ref, timestamp DESC),
INDEX idx_job_graph_run_id ON partition_events USING HASH (job_graph_run_id),
INDEX idx_status_timestamp ON partition_events USING BTREE (status, timestamp DESC),
INDEX idx_requesting_clients ON partition_events USING GIN (requesting_clients)
);
-- Partitioning by time for large datasets
CREATE TABLE partition_events_y2024m01 PARTITION OF partition_events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- Materialized view with automatic refresh
CREATE MATERIALIZED VIEW partition_latest_events AS
SELECT DISTINCT ON (partition_ref)
partition_ref, id, status, timestamp
FROM partition_events
ORDER BY partition_ref, timestamp DESC;
CREATE UNIQUE INDEX ON partition_latest_events (partition_ref);
Implementation Highlights:
- Connection pooling with deadpool-postgres
- JSONB for flexible array queries
- Partitioning by time for scalability
- Materialized views with refresh strategies
- Listen/Notify for real-time streaming
C. Delta Lake Implementation
Use Case: Data lake environments, analytics workloads, historical analysis Schema:
partition_events/
├── _delta_log/
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
└── part-00000-*.parquet
Table Schema:
Schema::new(vec![
Field::new("partition_event_id", DataType::Utf8, false),
Field::new("partition_ref", DataType::Utf8, false),
Field::new("status", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(TimeUnit::Microsecond, None), false),
Field::new("job_graph_run_id", DataType::Utf8, true),
Field::new("job_run_id", DataType::Utf8, true),
Field::new("producing_job_label", DataType::Utf8, true),
Field::new("requesting_clients", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
Field::new("delegated_to_run_id", DataType::Utf8, true),
Field::new("upstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
Field::new("downstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
Field::new("manifest_data", DataType::Binary, true),
Field::new("failure_reason", DataType::Utf8, true),
])
Implementation Highlights:
- Partitioned by date for query performance
- Z-ordering on partition_ref for clustering
- Optimize operations for compaction
- DataFusion for complex analytical queries
- Streaming through Delta Lake's log tail
3. Configuration and Factory Pattern
Configuration Schema
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StoreConfig {
SQLite {
path: PathBuf,
pragma: HashMap<String, String>,
},
PostgreSQL {
connection_string: String,
pool_size: Option<u32>,
partition_strategy: Option<PartitionStrategy>,
},
DeltaLake {
table_uri: String,
storage_options: HashMap<String, String>,
partition_columns: Vec<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartitionStrategy {
ByMonth,
ByWeek,
ByDay,
}
Factory Implementation
pub struct PartitionActivityStoreFactory;
impl PartitionActivityStoreFactory {
pub async fn create(config: &StoreConfig) -> Result<Box<dyn PartitionActivityStore>, Box<dyn std::error::Error>> {
match config {
StoreConfig::SQLite { path, pragma } => {
let store = SQLitePartitionActivityStore::new(path, pragma).await?;
Ok(Box::new(store))
}
StoreConfig::PostgreSQL { connection_string, pool_size, partition_strategy } => {
let store = PostgreSQLPartitionActivityStore::new(
connection_string,
pool_size.unwrap_or(10),
partition_strategy.clone()
).await?;
Ok(Box::new(store))
}
StoreConfig::DeltaLake { table_uri, storage_options, partition_columns } => {
let store = DeltaLakePartitionActivityStore::new(
table_uri,
storage_options.clone(),
partition_columns.clone()
).await?;
Ok(Box::new(store))
}
}
}
}
4. Protocol Buffer Extensions
New Message Types
// Enhanced partition event with storage metadata
message PartitionEvent {
// Event identity
string partition_event_id = 1;
google.protobuf.Timestamp timestamp = 2;
// Partition identification
PartitionRef partition_ref = 3;
PartitionStatus status = 4;
// Build context
string job_graph_run_id = 5;
string job_run_id = 6;
JobLabel producing_job = 7;
// Coordination metadata
repeated string requesting_clients = 8;
string delegated_to_run_id = 9;
// Dependencies
repeated PartitionRef upstream_deps = 10;
repeated PartitionRef downstream_deps = 11;
// Data about the partition
PartitionManifest manifest = 12;
string failure_reason = 13;
// Storage metadata (added)
string storage_backend = 14;
map<string, string> storage_metadata = 15;
}
// Query capabilities
message PartitionEventQuery {
repeated PartitionRef partition_refs = 1;
repeated PartitionStatus statuses = 2;
repeated string job_graph_run_ids = 3;
TimeRange time_range = 4;
int32 limit = 5;
int32 offset = 6;
OrderBy order_by = 7;
}
message TimeRange {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
}
message OrderBy {
string field = 1;
bool ascending = 2;
}
// Stream filtering
message EventStreamFilter {
repeated PartitionRef partition_refs = 1;
repeated PartitionStatus statuses = 2;
google.protobuf.Timestamp since = 3;
}
// Coordination support
message ActiveBuild {
PartitionRef partition_ref = 1;
string job_graph_run_id = 2;
PartitionStatus status = 3;
repeated string requesting_clients = 4;
google.protobuf.Timestamp started_at = 5;
}
message DependencyGraph {
PartitionRef root = 1;
repeated DependencyNode nodes = 2;
repeated DependencyEdge edges = 3;
}
message DependencyNode {
PartitionRef partition_ref = 1;
PartitionStatus status = 2;
google.protobuf.Timestamp last_updated = 3;
}
message DependencyEdge {
PartitionRef source = 1;
PartitionRef target = 2;
DependencyType type = 3;
}
enum DependencyType {
DEPENDS_ON = 0;
PRODUCES = 1;
}
Enhanced Catalog Service
service EnhancedCatalog {
// Existing catalog methods
rpc ListJobGraphRuns(ListJobGraphRunsRequest) returns (ListJobGraphRunsResponse);
rpc ListPartitions(ListPartitionManifestsRequest) returns (ListPartitionManifestsResponse);
// New partition activity methods
rpc AppendPartitionEvent(AppendPartitionEventRequest) returns (AppendPartitionEventResponse);
rpc GetLatestPartitionEvent(GetLatestPartitionEventRequest) returns (GetLatestPartitionEventResponse);
rpc QueryPartitionEvents(QueryPartitionEventsRequest) returns (QueryPartitionEventsResponse);
rpc StreamPartitionEvents(StreamPartitionEventsRequest) returns (stream PartitionEvent);
// Coordination methods
rpc RequestPartition(RequestPartitionRequest) returns (RequestPartitionResponse);
rpc GetActiveBuildStatus(GetActiveBuildStatusRequest) returns (GetActiveBuildStatusResponse);
rpc GetDependencyGraph(GetDependencyGraphRequest) returns (GetDependencyGraphResponse);
}
// Request/Response message types
message AppendPartitionEventRequest {
PartitionEvent event = 1;
}
message AppendPartitionEventResponse {
string event_id = 1;
}
message GetLatestPartitionEventRequest {
PartitionRef partition_ref = 1;
}
message GetLatestPartitionEventResponse {
PartitionEvent event = 1;
}
message QueryPartitionEventsRequest {
PartitionEventQuery query = 1;
}
message QueryPartitionEventsResponse {
repeated PartitionEvent events = 1;
int64 total_count = 2;
}
message StreamPartitionEventsRequest {
EventStreamFilter filter = 1;
}
message RequestPartitionRequest {
PartitionRef partition_ref = 1;
string client_id = 2;
RequestOptions options = 3;
}
message RequestOptions {
bool allow_delegation = 1;
int32 timeout_seconds = 2;
}
message RequestPartitionResponse {
RequestResult result = 1;
}
message RequestResult {
oneof result {
PartitionLocation available = 1;
DelegationToken delegated = 2;
BuildToken building = 3;
}
}
message PartitionLocation {
PartitionRef partition_ref = 1;
PartitionManifest manifest = 2;
}
message DelegationToken {
string job_graph_run_id = 1;
google.protobuf.Timestamp estimated_completion = 2;
}
message BuildToken {
string job_graph_run_id = 1;
google.protobuf.Timestamp started_at = 2;
}
5. Implementation Phases
Phase 1: Core Infrastructure
- Abstract trait definition
- SQLite implementation (simplest)
- Basic factory pattern
- Unit tests for storage operations
Phase 2: PostgreSQL Implementation
- Connection pooling setup
- Advanced indexing strategies
- Materialized view management
- Performance benchmarking
Phase 3: Delta Lake Implementation
- Delta Lake table setup
- Parquet serialization
- Query optimization
- Analytics integration
Phase 4: Integration & Testing
- End-to-end integration tests
- Performance comparison benchmarks
- Migration utilities between backends
- Production deployment guides
6. Performance Considerations
Write Performance
- SQLite: Single-writer limitation, but excellent for development
- PostgreSQL: Excellent concurrent write performance with proper indexing
- Delta Lake: Batch writes preferred, automatic compaction
Read Performance
- SQLite: Fast for simple queries, limited by single-process nature
- PostgreSQL: Excellent with proper indexing and materialized views
- Delta Lake: Optimized for analytical queries, Z-ordering for point lookups
Storage Efficiency
- SQLite: Compact storage, limited by 281TB max size
- PostgreSQL: Efficient with proper partitioning, unlimited growth
- Delta Lake: Columnar storage, excellent compression, versioning overhead
7. Migration Strategy
Cross-Backend Migration
pub async fn migrate_events(
source: &dyn PartitionActivityStore,
target: &dyn PartitionActivityStore,
batch_size: usize,
time_range: Option<(Timestamp, Timestamp)>,
) -> Result<u64, Box<dyn std::error::Error>> {
// Implementation for moving events between stores
}
Schema Evolution
- Forward-compatible protobuf changes
- Storage-specific schema migration scripts
- Version tracking in metadata
This design provides a robust, scalable foundation for the partition activity log while maintaining flexibility for different deployment scenarios and performance requirements.