databuild/plans/partition-activity-log-access-layer.md

17 KiB

Partition Activity Log Access Layer Design

Overview

This document details the implementation of a swappable datastore access layer for the Partition Activity Log, enabling support for SQLite, PostgreSQL, and Delta Lake backends while maintaining consistent semantics and performance characteristics.

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                    DataBuild Core                           │
├─────────────────────────────────────────────────────────────┤
│              Activity Log Service                           │
├─────────────────────────────────────────────────────────────┤
│              Abstract Storage Interface                     │
├─────────────────────────────────────────────────────────────┤
│   SQLite Impl  │  PostgreSQL Impl  │  Delta Lake Impl       │
└─────────────────────────────────────────────────────────────┘

1. Abstract Storage Interface

Core Trait Definition

pub trait PartitionActivityStore: Send + Sync {
    type Error: std::error::Error + Send + Sync + 'static;
    
    // Event operations
    async fn append_event(&self, event: &PartitionEvent) -> Result<(), Self::Error>;
    async fn get_latest_event(&self, partition_ref: &PartitionRef) -> Result<Option<PartitionEvent>, Self::Error>;
    async fn get_events_since(&self, partition_ref: &PartitionRef, since: Timestamp) -> Result<Vec<PartitionEvent>, Self::Error>;
    
    // Batch operations for performance
    async fn append_events(&self, events: &[PartitionEvent]) -> Result<(), Self::Error>;
    async fn get_latest_events(&self, partition_refs: &[PartitionRef]) -> Result<Vec<Option<PartitionEvent>>, Self::Error>;
    
    // Query operations
    async fn query_events(&self, query: &PartitionEventQuery) -> Result<Vec<PartitionEvent>, Self::Error>;
    async fn count_events(&self, query: &PartitionEventQuery) -> Result<u64, Self::Error>;
    
    // Coordination queries
    async fn get_active_builds(&self) -> Result<Vec<ActiveBuild>, Self::Error>;
    async fn get_dependency_graph(&self, partition_ref: &PartitionRef, depth: u32) -> Result<DependencyGraph, Self::Error>;
    
    // Streaming for real-time updates
    async fn stream_events(&self, filter: &EventStreamFilter) -> Result<Box<dyn Stream<Item = PartitionEvent>>, Self::Error>;
    
    // Maintenance operations
    async fn vacuum(&self, before: Timestamp) -> Result<u64, Self::Error>;
    async fn get_statistics(&self) -> Result<StoreStatistics, Self::Error>;
}

Query Types

#[derive(Debug, Clone)]
pub struct PartitionEventQuery {
    pub partition_refs: Option<Vec<PartitionRef>>,
    pub statuses: Option<Vec<PartitionStatus>>,
    pub job_graph_run_ids: Option<Vec<String>>,
    pub time_range: Option<(Timestamp, Timestamp)>,
    pub limit: Option<u32>,
    pub offset: Option<u32>,
    pub order_by: Option<OrderBy>,
}

#[derive(Debug, Clone)]
pub struct EventStreamFilter {
    pub partition_refs: Option<Vec<PartitionRef>>,
    pub statuses: Option<Vec<PartitionStatus>>,
    pub since: Option<Timestamp>,
}

#[derive(Debug, Clone)]
pub struct ActiveBuild {
    pub partition_ref: PartitionRef,
    pub job_graph_run_id: String,
    pub status: PartitionStatus,
    pub requesting_clients: Vec<String>,
    pub started_at: Timestamp,
}

#[derive(Debug, Clone)]
pub struct DependencyGraph {
    pub root: PartitionRef,
    pub nodes: Vec<DependencyNode>,
    pub edges: Vec<DependencyEdge>,
}

2. Storage Implementation Strategies

A. SQLite Implementation

Use Case: Single-node development, embedded deployments, testing Schema:

CREATE TABLE partition_events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    partition_event_id TEXT NOT NULL UNIQUE,
    partition_ref TEXT NOT NULL,
    status INTEGER NOT NULL,
    timestamp INTEGER NOT NULL,
    job_graph_run_id TEXT,
    job_run_id TEXT,
    producing_job_label TEXT,
    requesting_clients TEXT, -- JSON array
    delegated_to_run_id TEXT,
    upstream_deps TEXT,      -- JSON array
    downstream_deps TEXT,    -- JSON array
    manifest_data BLOB,      -- Protobuf serialized
    failure_reason TEXT,
    
    -- Indexes for performance
    INDEX idx_partition_ref (partition_ref),
    INDEX idx_timestamp (timestamp),
    INDEX idx_job_graph_run_id (job_graph_run_id),
    INDEX idx_status (status),
    INDEX idx_partition_timestamp (partition_ref, timestamp DESC)
);

-- Materialized view for latest events
CREATE TABLE partition_latest_events (
    partition_ref TEXT PRIMARY KEY,
    latest_event_id INTEGER NOT NULL,
    status INTEGER NOT NULL,
    timestamp INTEGER NOT NULL,
    FOREIGN KEY (latest_event_id) REFERENCES partition_events(id)
);

Implementation Highlights:

  • WAL mode for better concurrency
  • Prepared statements for performance
  • Periodic VACUUM for maintenance
  • JSON functions for array queries

B. PostgreSQL Implementation

Use Case: Production deployments, high concurrency, complex queries Schema:

CREATE TABLE partition_events (
    id BIGSERIAL PRIMARY KEY,
    partition_event_id UUID NOT NULL UNIQUE,
    partition_ref TEXT NOT NULL,
    status INTEGER NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    job_graph_run_id TEXT,
    job_run_id TEXT,
    producing_job_label TEXT,
    requesting_clients JSONB,
    delegated_to_run_id TEXT,
    upstream_deps JSONB,
    downstream_deps JSONB,
    manifest_data BYTEA,
    failure_reason TEXT,
    
    -- Optimized indexes
    INDEX idx_partition_ref_timestamp ON partition_events USING BTREE (partition_ref, timestamp DESC),
    INDEX idx_job_graph_run_id ON partition_events USING HASH (job_graph_run_id),
    INDEX idx_status_timestamp ON partition_events USING BTREE (status, timestamp DESC),
    INDEX idx_requesting_clients ON partition_events USING GIN (requesting_clients)
);

-- Partitioning by time for large datasets
CREATE TABLE partition_events_y2024m01 PARTITION OF partition_events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- Materialized view with automatic refresh
CREATE MATERIALIZED VIEW partition_latest_events AS
SELECT DISTINCT ON (partition_ref) 
    partition_ref, id, status, timestamp
FROM partition_events 
ORDER BY partition_ref, timestamp DESC;

CREATE UNIQUE INDEX ON partition_latest_events (partition_ref);

Implementation Highlights:

  • Connection pooling with deadpool-postgres
  • JSONB for flexible array queries
  • Partitioning by time for scalability
  • Materialized views with refresh strategies
  • Listen/Notify for real-time streaming

C. Delta Lake Implementation

Use Case: Data lake environments, analytics workloads, historical analysis Schema:

partition_events/
├── _delta_log/
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
└── part-00000-*.parquet

Table Schema:

Schema::new(vec![
    Field::new("partition_event_id", DataType::Utf8, false),
    Field::new("partition_ref", DataType::Utf8, false),
    Field::new("status", DataType::Int32, false),
    Field::new("timestamp", DataType::Timestamp(TimeUnit::Microsecond, None), false),
    Field::new("job_graph_run_id", DataType::Utf8, true),
    Field::new("job_run_id", DataType::Utf8, true),
    Field::new("producing_job_label", DataType::Utf8, true),
    Field::new("requesting_clients", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
    Field::new("delegated_to_run_id", DataType::Utf8, true),
    Field::new("upstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
    Field::new("downstream_deps", DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true),
    Field::new("manifest_data", DataType::Binary, true),
    Field::new("failure_reason", DataType::Utf8, true),
])

Implementation Highlights:

  • Partitioned by date for query performance
  • Z-ordering on partition_ref for clustering
  • Optimize operations for compaction
  • DataFusion for complex analytical queries
  • Streaming through Delta Lake's log tail

3. Configuration and Factory Pattern

Configuration Schema

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StoreConfig {
    SQLite {
        path: PathBuf,
        pragma: HashMap<String, String>,
    },
    PostgreSQL {
        connection_string: String,
        pool_size: Option<u32>,
        partition_strategy: Option<PartitionStrategy>,
    },
    DeltaLake {
        table_uri: String,
        storage_options: HashMap<String, String>,
        partition_columns: Vec<String>,
    },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PartitionStrategy {
    ByMonth,
    ByWeek,
    ByDay,
}

Factory Implementation

pub struct PartitionActivityStoreFactory;

impl PartitionActivityStoreFactory {
    pub async fn create(config: &StoreConfig) -> Result<Box<dyn PartitionActivityStore>, Box<dyn std::error::Error>> {
        match config {
            StoreConfig::SQLite { path, pragma } => {
                let store = SQLitePartitionActivityStore::new(path, pragma).await?;
                Ok(Box::new(store))
            }
            StoreConfig::PostgreSQL { connection_string, pool_size, partition_strategy } => {
                let store = PostgreSQLPartitionActivityStore::new(
                    connection_string, 
                    pool_size.unwrap_or(10),
                    partition_strategy.clone()
                ).await?;
                Ok(Box::new(store))
            }
            StoreConfig::DeltaLake { table_uri, storage_options, partition_columns } => {
                let store = DeltaLakePartitionActivityStore::new(
                    table_uri, 
                    storage_options.clone(),
                    partition_columns.clone()
                ).await?;
                Ok(Box::new(store))
            }
        }
    }
}

4. Protocol Buffer Extensions

New Message Types

// Enhanced partition event with storage metadata
message PartitionEvent {
  // Event identity
  string partition_event_id = 1;
  google.protobuf.Timestamp timestamp = 2;
  
  // Partition identification
  PartitionRef partition_ref = 3;
  PartitionStatus status = 4;
  
  // Build context
  string job_graph_run_id = 5;
  string job_run_id = 6;
  JobLabel producing_job = 7;
  
  // Coordination metadata
  repeated string requesting_clients = 8;
  string delegated_to_run_id = 9;
  
  // Dependencies
  repeated PartitionRef upstream_deps = 10;
  repeated PartitionRef downstream_deps = 11;
  
  // Data about the partition
  PartitionManifest manifest = 12;
  string failure_reason = 13;
  
  // Storage metadata (added)
  string storage_backend = 14;
  map<string, string> storage_metadata = 15;
}

// Query capabilities
message PartitionEventQuery {
  repeated PartitionRef partition_refs = 1;
  repeated PartitionStatus statuses = 2;
  repeated string job_graph_run_ids = 3;
  TimeRange time_range = 4;
  int32 limit = 5;
  int32 offset = 6;
  OrderBy order_by = 7;
}

message TimeRange {
  google.protobuf.Timestamp start = 1;
  google.protobuf.Timestamp end = 2;
}

message OrderBy {
  string field = 1;
  bool ascending = 2;
}

// Stream filtering
message EventStreamFilter {
  repeated PartitionRef partition_refs = 1;
  repeated PartitionStatus statuses = 2;
  google.protobuf.Timestamp since = 3;
}

// Coordination support
message ActiveBuild {
  PartitionRef partition_ref = 1;
  string job_graph_run_id = 2;
  PartitionStatus status = 3;
  repeated string requesting_clients = 4;
  google.protobuf.Timestamp started_at = 5;
}

message DependencyGraph {
  PartitionRef root = 1;
  repeated DependencyNode nodes = 2;
  repeated DependencyEdge edges = 3;
}

message DependencyNode {
  PartitionRef partition_ref = 1;
  PartitionStatus status = 2;
  google.protobuf.Timestamp last_updated = 3;
}

message DependencyEdge {
  PartitionRef source = 1;
  PartitionRef target = 2;
  DependencyType type = 3;
}

enum DependencyType {
  DEPENDS_ON = 0;
  PRODUCES = 1;
}

Enhanced Catalog Service

service EnhancedCatalog {
  // Existing catalog methods
  rpc ListJobGraphRuns(ListJobGraphRunsRequest) returns (ListJobGraphRunsResponse);
  rpc ListPartitions(ListPartitionManifestsRequest) returns (ListPartitionManifestsResponse);
  
  // New partition activity methods
  rpc AppendPartitionEvent(AppendPartitionEventRequest) returns (AppendPartitionEventResponse);
  rpc GetLatestPartitionEvent(GetLatestPartitionEventRequest) returns (GetLatestPartitionEventResponse);
  rpc QueryPartitionEvents(QueryPartitionEventsRequest) returns (QueryPartitionEventsResponse);
  rpc StreamPartitionEvents(StreamPartitionEventsRequest) returns (stream PartitionEvent);
  
  // Coordination methods
  rpc RequestPartition(RequestPartitionRequest) returns (RequestPartitionResponse);
  rpc GetActiveBuildStatus(GetActiveBuildStatusRequest) returns (GetActiveBuildStatusResponse);
  rpc GetDependencyGraph(GetDependencyGraphRequest) returns (GetDependencyGraphResponse);
}

// Request/Response message types
message AppendPartitionEventRequest {
  PartitionEvent event = 1;
}

message AppendPartitionEventResponse {
  string event_id = 1;
}

message GetLatestPartitionEventRequest {
  PartitionRef partition_ref = 1;
}

message GetLatestPartitionEventResponse {
  PartitionEvent event = 1;
}

message QueryPartitionEventsRequest {
  PartitionEventQuery query = 1;
}

message QueryPartitionEventsResponse {
  repeated PartitionEvent events = 1;
  int64 total_count = 2;
}

message StreamPartitionEventsRequest {
  EventStreamFilter filter = 1;
}

message RequestPartitionRequest {
  PartitionRef partition_ref = 1;
  string client_id = 2;
  RequestOptions options = 3;
}

message RequestOptions {
  bool allow_delegation = 1;
  int32 timeout_seconds = 2;
}

message RequestPartitionResponse {
  RequestResult result = 1;
}

message RequestResult {
  oneof result {
    PartitionLocation available = 1;
    DelegationToken delegated = 2;
    BuildToken building = 3;
  }
}

message PartitionLocation {
  PartitionRef partition_ref = 1;
  PartitionManifest manifest = 2;
}

message DelegationToken {
  string job_graph_run_id = 1;
  google.protobuf.Timestamp estimated_completion = 2;
}

message BuildToken {
  string job_graph_run_id = 1;
  google.protobuf.Timestamp started_at = 2;
}

5. Implementation Phases

Phase 1: Core Infrastructure

  • Abstract trait definition
  • SQLite implementation (simplest)
  • Basic factory pattern
  • Unit tests for storage operations

Phase 2: PostgreSQL Implementation

  • Connection pooling setup
  • Advanced indexing strategies
  • Materialized view management
  • Performance benchmarking

Phase 3: Delta Lake Implementation

  • Delta Lake table setup
  • Parquet serialization
  • Query optimization
  • Analytics integration

Phase 4: Integration & Testing

  • End-to-end integration tests
  • Performance comparison benchmarks
  • Migration utilities between backends
  • Production deployment guides

6. Performance Considerations

Write Performance

  • SQLite: Single-writer limitation, but excellent for development
  • PostgreSQL: Excellent concurrent write performance with proper indexing
  • Delta Lake: Batch writes preferred, automatic compaction

Read Performance

  • SQLite: Fast for simple queries, limited by single-process nature
  • PostgreSQL: Excellent with proper indexing and materialized views
  • Delta Lake: Optimized for analytical queries, Z-ordering for point lookups

Storage Efficiency

  • SQLite: Compact storage, limited by 281TB max size
  • PostgreSQL: Efficient with proper partitioning, unlimited growth
  • Delta Lake: Columnar storage, excellent compression, versioning overhead

7. Migration Strategy

Cross-Backend Migration

pub async fn migrate_events(
    source: &dyn PartitionActivityStore,
    target: &dyn PartitionActivityStore,
    batch_size: usize,
    time_range: Option<(Timestamp, Timestamp)>,
) -> Result<u64, Box<dyn std::error::Error>> {
    // Implementation for moving events between stores
}

Schema Evolution

  • Forward-compatible protobuf changes
  • Storage-specific schema migration scripts
  • Version tracking in metadata

This design provides a robust, scalable foundation for the partition activity log while maintaining flexibility for different deployment scenarios and performance requirements.