databuild/plans/18-bel-refactor.md

10 KiB

BEL Refactoring to 3-Tier Architecture

Overview

This plan restructures DataBuild's Build Event Log (BEL) access layer from the current monolithic trait to a clean 3-tier architecture as described in design/build-event-log.md. This refactoring creates clear separation of concerns and simplifies the codebase by removing complex storage backends.

Current State Analysis

The current BEL implementation (databuild/event_log/mod.rs) has a single BuildEventLog trait that mixes:

  • Low-level storage operations (append_event, get_events_in_range)
  • High-level aggregation queries (list_build_requests, get_activity_summary)
  • Application-specific logic (get_latest_partition_status, get_active_builds_for_partition)

This creates several problems:

  • Storage backends must implement complex aggregation logic
  • No clear separation between storage and business logic
  • Difficult to extend with new query patterns
  • Delta Lake implementation adds unnecessary complexity

Target Architecture

1. Storage Layer: BELStorage Trait

Minimal append-only interface optimized for sequential scanning:

#[async_trait]
pub trait BELStorage: Send + Sync {
    /// Append a single event, returns the sequential index
    async fn append_event(&self, event: BuildEvent) -> Result<i64>;
    
    /// List events with filtering, starting from a given index
    async fn list_events(&self, since_idx: i64, filter: EventFilter) -> Result<EventPage>;
    
    /// Initialize storage backend (create tables, etc.)
    async fn initialize(&self) -> Result<()>;
}

#[derive(Debug, Clone)]
pub struct EventPage {
    pub events: Vec<BuildEvent>,
    pub next_idx: i64,
    pub has_more: bool,
}

2. Query Engine Layer: BELQueryEngine

App-layer aggregation that scans storage events:

pub struct BELQueryEngine {
    storage: Arc<dyn BELStorage>,
}

impl BELQueryEngine {
    pub fn new(storage: Arc<dyn BELStorage>) -> Self {
        Self { storage }
    }
    
    /// Get latest status for a partition by scanning recent events
    pub async fn get_latest_partition_status(&self, partition_ref: &str) -> Result<Option<PartitionStatus>>;
    
    /// Get all build requests that are currently building a partition
    pub async fn get_active_builds_for_partition(&self, partition_ref: &str) -> Result<Vec<String>>;
    
    /// Get summary of a build request by aggregating its events
    pub async fn get_build_request_summary(&self, build_id: &str) -> Result<BuildRequestSummary>;
    
    /// List build requests with pagination and filtering
    pub async fn list_build_requests(&self, request: BuildsListRequest) -> Result<BuildsListResponse>;
    
    /// Get activity summary for dashboard
    pub async fn get_activity_summary(&self) -> Result<ActivityResponse>;
}

3. Client Layer: Repository Pattern

Clean interfaces for CLI, Service, and Dashboard (unchanged from current):

// Existing repositories continue to work, but now use BELQueryEngine
pub struct PartitionsRepository {
    query_engine: Arc<BELQueryEngine>,
}

pub struct BuildsRepository {
    query_engine: Arc<BELQueryEngine>,
}

Implementation Plan

Phase 1: Create Storage Layer Interface

  1. Define New Storage Trait

    // In databuild/event_log/storage.rs
    pub trait BELStorage { /* as defined above */ }
    
    pub fn create_bel_storage(uri: &str) -> Result<Box<dyn BELStorage>>;
    
  2. Add EventFilter to Protobuf

    // In databuild/databuild.proto
    message EventFilter {
      repeated string partition_refs = 1;
      repeated string partition_patterns = 2;  
      repeated string job_labels = 3;
      repeated string task_ids = 4;
      repeated string build_request_ids = 5;
    }
    
    message EventPage {
      repeated BuildEvent events = 1;
      int64 next_idx = 2;
      bool has_more = 3;
    }
    
  3. Implement SQLite Storage Backend

    // In databuild/event_log/sqlite_storage.rs
    pub struct SqliteBELStorage {
        pool: sqlx::SqlitePool,
    }
    
    impl BELStorage for SqliteBELStorage {
        async fn append_event(&self, event: BuildEvent) -> Result<i64> {
            // Simple INSERT returning rowid
            let serialized = serde_json::to_string(&event)?;
            let row_id = sqlx::query("INSERT INTO build_events (event_data) VALUES (?)")
                .bind(serialized)
                .execute(&self.pool)
                .await?
                .last_insert_rowid();
            Ok(row_id)
        }
    
        async fn list_events(&self, since_idx: i64, filter: EventFilter) -> Result<EventPage> {
            // Efficient sequential scan with filtering
            // Build WHERE clause based on filter criteria
            // Return paginated results
        }
    }
    

Phase 2: Create Query Engine Layer

  1. Implement BELQueryEngine

    // In databuild/event_log/query_engine.rs
    impl BELQueryEngine {
        pub async fn get_latest_partition_status(&self, partition_ref: &str) -> Result<Option<PartitionStatus>> {
            // Scan recent partition events to determine current status
            let filter = EventFilter {
                partition_refs: vec![partition_ref.to_string()],
                ..Default::default()
            };
    
            let events = self.storage.list_events(0, filter).await?;
            self.aggregate_partition_status(&events.events)
        }
    
        async fn aggregate_partition_status(&self, events: &[BuildEvent]) -> Result<Option<PartitionStatus>> {
            // Walk through events chronologically to determine final partition status
            // Return the most recent status
        }
    }
    
  2. Implement All Current Query Methods

    • Port all methods from current BuildEventLog trait
    • Use event scanning and aggregation instead of complex SQL queries
    • Keep same return types for compatibility

Phase 3: Migrate Existing Code

  1. Update Repository Constructors

    // Old: PartitionsRepository::new(Arc<dyn BuildEventLog>)
    // New: PartitionsRepository::new(Arc<BELQueryEngine>)
    
    impl PartitionsRepository {
        pub fn new(query_engine: Arc<BELQueryEngine>) -> Self {
            Self { query_engine }
        }
    
        pub async fn list_protobuf(&self, request: PartitionsListRequest) -> Result<PartitionsListResponse> {
            self.query_engine.list_build_requests(request).await
        }
    }
    
  2. Update CLI and Service Initialization

    // In CLI main.rs and service mod.rs
    let storage = create_bel_storage(&event_log_uri).await?;
    let query_engine = Arc::new(BELQueryEngine::new(storage));
    
    let partitions_repo = PartitionsRepository::new(query_engine.clone());
    let builds_repo = BuildsRepository::new(query_engine.clone());
    

Phase 4: Remove Legacy Components

  1. Remove Delta Lake Implementation

    // Delete databuild/event_log/delta.rs
    // Remove delta dependencies from MODULE.bazel
    // Remove delta:// support from create_build_event_log()
    
  2. Deprecate Old BuildEventLog Trait

    // Mark as deprecated, keep for backwards compatibility during transition
    #[deprecated(note = "Use BELQueryEngine and BELStorage instead")]
    pub trait BuildEventLog { /* existing implementation */ }
    
  3. Update Factory Function

    // In databuild/event_log/mod.rs
    pub async fn create_build_event_log(uri: &str) -> Result<Arc<BELQueryEngine>> {
        let storage = if uri == "stdout" {
            Arc::new(stdout::StdoutBELStorage::new()) as Arc<dyn BELStorage>
        } else if uri.starts_with("sqlite://") {
            let path = &uri[9..];
            let storage = sqlite_storage::SqliteBELStorage::new(path).await?;
            storage.initialize().await?;
            Arc::new(storage) as Arc<dyn BELStorage>
        } else if uri.starts_with("postgres://") {
            let storage = postgres_storage::PostgresBELStorage::new(uri).await?;
            storage.initialize().await?;
            Arc::new(storage) as Arc<dyn BELStorage>
        } else {
            return Err(BuildEventLogError::ConnectionError(
                format!("Unsupported build event log URI: {}", uri)
            ));
        };
    
        Ok(Arc::new(BELQueryEngine::new(storage)))
    }
    

Phase 5: Final Cleanup

  1. Remove Legacy Implementations

    • Delete complex aggregation logic from existing storage backends
    • Simplify remaining backends to implement only new BELStorage trait
    • Remove deprecated BuildEventLog trait
  2. Update Documentation

    • Update design docs to reflect new architecture
    • Create migration guide for external users
    • Update code examples and README

Benefits of 3-Tier Architecture

Simplified Codebase

  • Removes complex Delta Lake dependencies
  • Storage backends focus only on append + scan operations
  • Clear separation between storage and business logic

Better Maintainability

  • Single SQLite implementation for most use cases
  • Query logic centralized in one place
  • Easier to debug and test each layer independently

Future-Ready Foundation

  • Clean foundation for wants system (next phase)
  • Easy to add new storage backends when needed
  • Query engine ready for cross-graph coordination APIs

Performance Benefits

  • Eliminates complex SQL joins in storage layer
  • Enables sequential scanning optimizations
  • Cleaner separation allows targeted optimizations

Success Criteria

Phase 1-2: Foundation

  • Storage layer trait compiles and tests pass
  • SQLite storage backend supports append + list operations
  • Query engine provides same functionality as current BEL trait
  • EventFilter protobuf types generate correctly

Phase 3-4: Migration

  • All repositories work with new query engine
  • CLI and service use new architecture
  • Existing functionality unchanged from user perspective
  • Delta Lake implementation removed

Phase 5: Completion

  • Legacy BEL trait removed
  • Performance meets or exceeds current implementation
  • Documentation updated for new architecture
  • Codebase simplified and maintainable

Risk Mitigation

  1. Gradual Migration: Implement new architecture alongside existing code
  2. Feature Parity: Ensure all existing functionality works before removing old code
  3. Performance Testing: Benchmark new implementation against current performance
  4. Simple First: Start with SQLite-only implementation, add complexity later as needed