From dd6870b9801a4bad792ae1745726869a6adf5fa2 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Thu, 3 Jul 2025 14:21:01 -0700 Subject: [PATCH] Add plan for partition activity log access layer --- databuild/databuild.proto | 204 +++++++ plans/partition-activity-log-access-layer.md | 540 +++++++++++++++++++ 2 files changed, 744 insertions(+) create mode 100644 plans/partition-activity-log-access-layer.md diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 6d54998..04ea289 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -173,6 +173,199 @@ service DataBuildService { rpc Build(GraphBuildRequest) returns (GraphBuildResponse); } +/////////////////////////////////////////////////////////////////////////////////////////////// +// Partition Activity Log +/////////////////////////////////////////////////////////////////////////////////////////////// + +// Partition lifecycle states +enum PartitionStatus { + PARTITION_UNKNOWN = 0; + PARTITION_REQUESTED = 1; // Partition requested but not yet scheduled + PARTITION_SCHEDULED = 2; // Job scheduled to produce this partition + PARTITION_BUILDING = 3; // Job actively building this partition + PARTITION_AVAILABLE = 4; // Partition successfully built and available + PARTITION_FAILED = 5; // Partition build failed + PARTITION_STALE = 6; // Partition exists but upstream dependencies changed + PARTITION_DELEGATED = 7; // Request delegated to existing build +} + +// Individual partition activity event +message PartitionEvent { + // Event identity + string partition_event_id = 1; + google.protobuf.Timestamp timestamp = 2; + + // Partition identification + PartitionRef partition_ref = 3; + PartitionStatus status = 4; + + // Build context + string job_graph_run_id = 5; // Links to graph execution + string job_run_id = 6; // Links to specific job run + JobLabel producing_job = 7; // Which job produces this partition + + // Coordination metadata + repeated string requesting_clients = 8; // Who requested this partition + string delegated_to_run_id = 9; // If delegated, which run + + // Dependencies + repeated PartitionRef upstream_deps = 10; + repeated PartitionRef downstream_deps = 11; + + // Data about the partition + PartitionManifest manifest = 12; // Present when status = AVAILABLE + string failure_reason = 13; // Present when status = FAILED + + // Storage metadata + string storage_backend = 14; + map storage_metadata = 15; +} + +// Query capabilities +message PartitionEventQuery { + repeated PartitionRef partition_refs = 1; + repeated PartitionStatus statuses = 2; + repeated string job_graph_run_ids = 3; + TimeRange time_range = 4; + int32 limit = 5; + int32 offset = 6; + OrderBy order_by = 7; +} + +message TimeRange { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; +} + +message OrderBy { + string field = 1; + bool ascending = 2; +} + +// Stream filtering +message EventStreamFilter { + repeated PartitionRef partition_refs = 1; + repeated PartitionStatus statuses = 2; + google.protobuf.Timestamp since = 3; +} + +// Coordination support +message ActiveBuild { + PartitionRef partition_ref = 1; + string job_graph_run_id = 2; + PartitionStatus status = 3; + repeated string requesting_clients = 4; + google.protobuf.Timestamp started_at = 5; +} + +message DependencyGraph { + PartitionRef root = 1; + repeated DependencyNode nodes = 2; + repeated DependencyEdge edges = 3; +} + +message DependencyNode { + PartitionRef partition_ref = 1; + PartitionStatus status = 2; + google.protobuf.Timestamp last_updated = 3; +} + +message DependencyEdge { + PartitionRef source = 1; + PartitionRef target = 2; + DependencyType type = 3; +} + +enum DependencyType { + DEPENDS_ON = 0; + PRODUCES = 1; +} + +// Request/Response message types +message AppendPartitionEventRequest { + PartitionEvent event = 1; +} + +message AppendPartitionEventResponse { + string event_id = 1; +} + +message GetLatestPartitionEventRequest { + PartitionRef partition_ref = 1; +} + +message GetLatestPartitionEventResponse { + PartitionEvent event = 1; +} + +message QueryPartitionEventsRequest { + PartitionEventQuery query = 1; +} + +message QueryPartitionEventsResponse { + repeated PartitionEvent events = 1; + int64 total_count = 2; +} + +message StreamPartitionEventsRequest { + EventStreamFilter filter = 1; +} + +message RequestPartitionRequest { + PartitionRef partition_ref = 1; + string client_id = 2; + RequestOptions options = 3; +} + +message RequestOptions { + bool allow_delegation = 1; + int32 timeout_seconds = 2; +} + +message RequestPartitionResponse { + RequestResult result = 1; +} + +message RequestResult { + oneof result { + PartitionLocation available = 1; + DelegationToken delegated = 2; + BuildToken building = 3; + } +} + +message PartitionLocation { + PartitionRef partition_ref = 1; + PartitionManifest manifest = 2; +} + +message DelegationToken { + string job_graph_run_id = 1; + google.protobuf.Timestamp estimated_completion = 2; +} + +message BuildToken { + string job_graph_run_id = 1; + google.protobuf.Timestamp started_at = 2; +} + +message GetActiveBuildStatusRequest { + repeated PartitionRef partition_refs = 1; +} + +message GetActiveBuildStatusResponse { + repeated ActiveBuild active_builds = 1; +} + +message GetDependencyGraphRequest { + PartitionRef partition_ref = 1; + int32 depth = 2; +} + +message GetDependencyGraphResponse { + DependencyGraph graph = 1; +} + /////////////////////////////////////////////////////////////////////////////////////////////// // Catalog /////////////////////////////////////////////////////////////////////////////////////////////// @@ -305,4 +498,15 @@ service Catalog { // Enables lookup of partition manifests produced as part of prior job runs rpc ListPartitions(ListPartitionManifestsRequest) returns (ListPartitionManifestsResponse); + + // Partition activity log methods + rpc AppendPartitionEvent(AppendPartitionEventRequest) returns (AppendPartitionEventResponse); + rpc GetLatestPartitionEvent(GetLatestPartitionEventRequest) returns (GetLatestPartitionEventResponse); + rpc QueryPartitionEvents(QueryPartitionEventsRequest) returns (QueryPartitionEventsResponse); + rpc StreamPartitionEvents(StreamPartitionEventsRequest) returns (stream PartitionEvent); + + // Coordination methods + rpc RequestPartition(RequestPartitionRequest) returns (RequestPartitionResponse); + rpc GetActiveBuildStatus(GetActiveBuildStatusRequest) returns (GetActiveBuildStatusResponse); + rpc GetDependencyGraph(GetDependencyGraphRequest) returns (GetDependencyGraphResponse); } diff --git a/plans/partition-activity-log-access-layer.md b/plans/partition-activity-log-access-layer.md new file mode 100644 index 0000000..e9c4fb3 --- /dev/null +++ b/plans/partition-activity-log-access-layer.md @@ -0,0 +1,540 @@ +# Partition Activity Log Access Layer Design + +## Overview + +This document details the implementation of a swappable datastore access layer for the Partition Activity Log, enabling support for SQLite, PostgreSQL, and Delta Lake backends while maintaining consistent semantics and performance characteristics. + +## Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ DataBuild Core │ +├─────────────────────────────────────────────────────────────┤ +│ Activity Log Service │ +├─────────────────────────────────────────────────────────────┤ +│ Abstract Storage Interface │ +├─────────────────────────────────────────────────────────────┤ +│ SQLite Impl │ PostgreSQL Impl │ Delta Lake Impl │ +└─────────────────────────────────────────────────────────────┘ +``` + +## 1. Abstract Storage Interface + +### Core Trait Definition + +```rust +pub trait PartitionActivityStore: Send + Sync { + type Error: std::error::Error + Send + Sync + 'static; + + // Event operations + async fn append_event(&self, event: &PartitionEvent) -> Result<(), Self::Error>; + async fn get_latest_event(&self, partition_ref: &PartitionRef) -> Result, Self::Error>; + async fn get_events_since(&self, partition_ref: &PartitionRef, since: Timestamp) -> Result, Self::Error>; + + // Batch operations for performance + async fn append_events(&self, events: &[PartitionEvent]) -> Result<(), Self::Error>; + async fn get_latest_events(&self, partition_refs: &[PartitionRef]) -> Result>, Self::Error>; + + // Query operations + async fn query_events(&self, query: &PartitionEventQuery) -> Result, Self::Error>; + async fn count_events(&self, query: &PartitionEventQuery) -> Result; + + // Coordination queries + async fn get_active_builds(&self) -> Result, Self::Error>; + async fn get_dependency_graph(&self, partition_ref: &PartitionRef, depth: u32) -> Result; + + // Streaming for real-time updates + async fn stream_events(&self, filter: &EventStreamFilter) -> Result>, Self::Error>; + + // Maintenance operations + async fn vacuum(&self, before: Timestamp) -> Result; + async fn get_statistics(&self) -> Result; +} +``` + +### Query Types + +```rust +#[derive(Debug, Clone)] +pub struct PartitionEventQuery { + pub partition_refs: Option>, + pub statuses: Option>, + pub job_graph_run_ids: Option>, + pub time_range: Option<(Timestamp, Timestamp)>, + pub limit: Option, + pub offset: Option, + pub order_by: Option, +} + +#[derive(Debug, Clone)] +pub struct EventStreamFilter { + pub partition_refs: Option>, + pub statuses: Option>, + pub since: Option, +} + +#[derive(Debug, Clone)] +pub struct ActiveBuild { + pub partition_ref: PartitionRef, + pub job_graph_run_id: String, + pub status: PartitionStatus, + pub requesting_clients: Vec, + pub started_at: Timestamp, +} + +#[derive(Debug, Clone)] +pub struct DependencyGraph { + pub root: PartitionRef, + pub nodes: Vec, + pub edges: Vec, +} +``` + +## 2. Storage Implementation Strategies + +### A. SQLite Implementation + +**Use Case**: Single-node development, embedded deployments, testing +**Schema**: +```sql +CREATE TABLE partition_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + partition_event_id TEXT NOT NULL UNIQUE, + partition_ref TEXT NOT NULL, + status INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + job_graph_run_id TEXT, + job_run_id TEXT, + producing_job_label TEXT, + requesting_clients TEXT, -- JSON array + delegated_to_run_id TEXT, + upstream_deps TEXT, -- JSON array + downstream_deps TEXT, -- JSON array + manifest_data BLOB, -- Protobuf serialized + failure_reason TEXT, + + -- Indexes for performance + INDEX idx_partition_ref (partition_ref), + INDEX idx_timestamp (timestamp), + INDEX idx_job_graph_run_id (job_graph_run_id), + INDEX idx_status (status), + INDEX idx_partition_timestamp (partition_ref, timestamp DESC) +); + +-- Materialized view for latest events +CREATE TABLE partition_latest_events ( + partition_ref TEXT PRIMARY KEY, + latest_event_id INTEGER NOT NULL, + status INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + FOREIGN KEY (latest_event_id) REFERENCES partition_events(id) +); +``` + +**Implementation Highlights**: +- WAL mode for better concurrency +- Prepared statements for performance +- Periodic VACUUM for maintenance +- JSON functions for array queries + +### B. PostgreSQL Implementation + +**Use Case**: Production deployments, high concurrency, complex queries +**Schema**: +```sql +CREATE TABLE partition_events ( + id BIGSERIAL PRIMARY KEY, + partition_event_id UUID NOT NULL UNIQUE, + partition_ref TEXT NOT NULL, + status INTEGER NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + job_graph_run_id TEXT, + job_run_id TEXT, + producing_job_label TEXT, + requesting_clients JSONB, + delegated_to_run_id TEXT, + upstream_deps JSONB, + downstream_deps JSONB, + manifest_data BYTEA, + failure_reason TEXT, + + -- Optimized indexes + INDEX idx_partition_ref_timestamp ON partition_events USING BTREE (partition_ref, timestamp DESC), + INDEX idx_job_graph_run_id ON partition_events USING HASH (job_graph_run_id), + INDEX idx_status_timestamp ON partition_events USING BTREE (status, timestamp DESC), + INDEX idx_requesting_clients ON partition_events USING GIN (requesting_clients) +); + +-- Partitioning by time for large datasets +CREATE TABLE partition_events_y2024m01 PARTITION OF partition_events +FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); + +-- Materialized view with automatic refresh +CREATE MATERIALIZED VIEW partition_latest_events AS +SELECT DISTINCT ON (partition_ref) + partition_ref, id, status, timestamp +FROM partition_events +ORDER BY partition_ref, timestamp DESC; + +CREATE UNIQUE INDEX ON partition_latest_events (partition_ref); +``` + +**Implementation Highlights**: +- Connection pooling with deadpool-postgres +- JSONB for flexible array queries +- Partitioning by time for scalability +- Materialized views with refresh strategies +- Listen/Notify for real-time streaming + +### C. Delta Lake Implementation + +**Use Case**: Data lake environments, analytics workloads, historical analysis +**Schema**: +``` +partition_events/ +├── _delta_log/ +│ ├── 00000000000000000000.json +│ └── 00000000000000000001.json +└── part-00000-*.parquet +``` + +**Table Schema**: +```rust +Schema::new(vec![ + Field::new("partition_event_id", DataType::Utf8, false), + Field::new("partition_ref", DataType::Utf8, false), + Field::new("status", DataType::Int32, false), + Field::new("timestamp", DataType::Timestamp(TimeUnit::Microsecond, None), false), + Field::new("job_graph_run_id", DataType::Utf8, true), + Field::new("job_run_id", DataType::Utf8, true), + Field::new("producing_job_label", DataType::Utf8, true), + Field::new("requesting_clients", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true), + Field::new("delegated_to_run_id", DataType::Utf8, true), + Field::new("upstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true), + Field::new("downstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true), + Field::new("manifest_data", DataType::Binary, true), + Field::new("failure_reason", DataType::Utf8, true), +]) +``` + +**Implementation Highlights**: +- Partitioned by date for query performance +- Z-ordering on partition_ref for clustering +- Optimize operations for compaction +- DataFusion for complex analytical queries +- Streaming through Delta Lake's log tail + +## 3. Configuration and Factory Pattern + +### Configuration Schema + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StoreConfig { + SQLite { + path: PathBuf, + pragma: HashMap, + }, + PostgreSQL { + connection_string: String, + pool_size: Option, + partition_strategy: Option, + }, + DeltaLake { + table_uri: String, + storage_options: HashMap, + partition_columns: Vec, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PartitionStrategy { + ByMonth, + ByWeek, + ByDay, +} +``` + +### Factory Implementation + +```rust +pub struct PartitionActivityStoreFactory; + +impl PartitionActivityStoreFactory { + pub async fn create(config: &StoreConfig) -> Result, Box> { + match config { + StoreConfig::SQLite { path, pragma } => { + let store = SQLitePartitionActivityStore::new(path, pragma).await?; + Ok(Box::new(store)) + } + StoreConfig::PostgreSQL { connection_string, pool_size, partition_strategy } => { + let store = PostgreSQLPartitionActivityStore::new( + connection_string, + pool_size.unwrap_or(10), + partition_strategy.clone() + ).await?; + Ok(Box::new(store)) + } + StoreConfig::DeltaLake { table_uri, storage_options, partition_columns } => { + let store = DeltaLakePartitionActivityStore::new( + table_uri, + storage_options.clone(), + partition_columns.clone() + ).await?; + Ok(Box::new(store)) + } + } + } +} +``` + +## 4. Protocol Buffer Extensions + +### New Message Types + +```protobuf +// Enhanced partition event with storage metadata +message PartitionEvent { + // Event identity + string partition_event_id = 1; + google.protobuf.Timestamp timestamp = 2; + + // Partition identification + PartitionRef partition_ref = 3; + PartitionStatus status = 4; + + // Build context + string job_graph_run_id = 5; + string job_run_id = 6; + JobLabel producing_job = 7; + + // Coordination metadata + repeated string requesting_clients = 8; + string delegated_to_run_id = 9; + + // Dependencies + repeated PartitionRef upstream_deps = 10; + repeated PartitionRef downstream_deps = 11; + + // Data about the partition + PartitionManifest manifest = 12; + string failure_reason = 13; + + // Storage metadata (added) + string storage_backend = 14; + map storage_metadata = 15; +} + +// Query capabilities +message PartitionEventQuery { + repeated PartitionRef partition_refs = 1; + repeated PartitionStatus statuses = 2; + repeated string job_graph_run_ids = 3; + TimeRange time_range = 4; + int32 limit = 5; + int32 offset = 6; + OrderBy order_by = 7; +} + +message TimeRange { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; +} + +message OrderBy { + string field = 1; + bool ascending = 2; +} + +// Stream filtering +message EventStreamFilter { + repeated PartitionRef partition_refs = 1; + repeated PartitionStatus statuses = 2; + google.protobuf.Timestamp since = 3; +} + +// Coordination support +message ActiveBuild { + PartitionRef partition_ref = 1; + string job_graph_run_id = 2; + PartitionStatus status = 3; + repeated string requesting_clients = 4; + google.protobuf.Timestamp started_at = 5; +} + +message DependencyGraph { + PartitionRef root = 1; + repeated DependencyNode nodes = 2; + repeated DependencyEdge edges = 3; +} + +message DependencyNode { + PartitionRef partition_ref = 1; + PartitionStatus status = 2; + google.protobuf.Timestamp last_updated = 3; +} + +message DependencyEdge { + PartitionRef source = 1; + PartitionRef target = 2; + DependencyType type = 3; +} + +enum DependencyType { + DEPENDS_ON = 0; + PRODUCES = 1; +} +``` + +### Enhanced Catalog Service + +```protobuf +service EnhancedCatalog { + // Existing catalog methods + rpc ListJobGraphRuns(ListJobGraphRunsRequest) returns (ListJobGraphRunsResponse); + rpc ListPartitions(ListPartitionManifestsRequest) returns (ListPartitionManifestsResponse); + + // New partition activity methods + rpc AppendPartitionEvent(AppendPartitionEventRequest) returns (AppendPartitionEventResponse); + rpc GetLatestPartitionEvent(GetLatestPartitionEventRequest) returns (GetLatestPartitionEventResponse); + rpc QueryPartitionEvents(QueryPartitionEventsRequest) returns (QueryPartitionEventsResponse); + rpc StreamPartitionEvents(StreamPartitionEventsRequest) returns (stream PartitionEvent); + + // Coordination methods + rpc RequestPartition(RequestPartitionRequest) returns (RequestPartitionResponse); + rpc GetActiveBuildStatus(GetActiveBuildStatusRequest) returns (GetActiveBuildStatusResponse); + rpc GetDependencyGraph(GetDependencyGraphRequest) returns (GetDependencyGraphResponse); +} + +// Request/Response message types +message AppendPartitionEventRequest { + PartitionEvent event = 1; +} + +message AppendPartitionEventResponse { + string event_id = 1; +} + +message GetLatestPartitionEventRequest { + PartitionRef partition_ref = 1; +} + +message GetLatestPartitionEventResponse { + PartitionEvent event = 1; +} + +message QueryPartitionEventsRequest { + PartitionEventQuery query = 1; +} + +message QueryPartitionEventsResponse { + repeated PartitionEvent events = 1; + int64 total_count = 2; +} + +message StreamPartitionEventsRequest { + EventStreamFilter filter = 1; +} + +message RequestPartitionRequest { + PartitionRef partition_ref = 1; + string client_id = 2; + RequestOptions options = 3; +} + +message RequestOptions { + bool allow_delegation = 1; + int32 timeout_seconds = 2; +} + +message RequestPartitionResponse { + RequestResult result = 1; +} + +message RequestResult { + oneof result { + PartitionLocation available = 1; + DelegationToken delegated = 2; + BuildToken building = 3; + } +} + +message PartitionLocation { + PartitionRef partition_ref = 1; + PartitionManifest manifest = 2; +} + +message DelegationToken { + string job_graph_run_id = 1; + google.protobuf.Timestamp estimated_completion = 2; +} + +message BuildToken { + string job_graph_run_id = 1; + google.protobuf.Timestamp started_at = 2; +} +``` + +## 5. Implementation Phases + +### Phase 1: Core Infrastructure +- Abstract trait definition +- SQLite implementation (simplest) +- Basic factory pattern +- Unit tests for storage operations + +### Phase 2: PostgreSQL Implementation +- Connection pooling setup +- Advanced indexing strategies +- Materialized view management +- Performance benchmarking + +### Phase 3: Delta Lake Implementation +- Delta Lake table setup +- Parquet serialization +- Query optimization +- Analytics integration + +### Phase 4: Integration & Testing +- End-to-end integration tests +- Performance comparison benchmarks +- Migration utilities between backends +- Production deployment guides + +## 6. Performance Considerations + +### Write Performance +- **SQLite**: Single-writer limitation, but excellent for development +- **PostgreSQL**: Excellent concurrent write performance with proper indexing +- **Delta Lake**: Batch writes preferred, automatic compaction + +### Read Performance +- **SQLite**: Fast for simple queries, limited by single-process nature +- **PostgreSQL**: Excellent with proper indexing and materialized views +- **Delta Lake**: Optimized for analytical queries, Z-ordering for point lookups + +### Storage Efficiency +- **SQLite**: Compact storage, limited by 281TB max size +- **PostgreSQL**: Efficient with proper partitioning, unlimited growth +- **Delta Lake**: Columnar storage, excellent compression, versioning overhead + +## 7. Migration Strategy + +### Cross-Backend Migration +```rust +pub async fn migrate_events( + source: &dyn PartitionActivityStore, + target: &dyn PartitionActivityStore, + batch_size: usize, + time_range: Option<(Timestamp, Timestamp)>, +) -> Result> { + // Implementation for moving events between stores +} +``` + +### Schema Evolution +- Forward-compatible protobuf changes +- Storage-specific schema migration scripts +- Version tracking in metadata + +This design provides a robust, scalable foundation for the partition activity log while maintaining flexibility for different deployment scenarios and performance requirements. \ No newline at end of file