From bc61d8f53085bf878dd535aa4eb296e6db23b970 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 13 Oct 2025 13:29:04 -0700 Subject: [PATCH] partition schedulability --- databuild/build_event_log.rs | 166 +++++++++++++-------------- databuild/build_state.rs | 206 +++++++++++++++++++++++++++++++++- databuild/event_transforms.rs | 11 +- databuild/orchestrator.rs | 93 ++++++++++++++- 4 files changed, 382 insertions(+), 94 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 1caae1f..0ed6958 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -17,7 +17,7 @@ pub trait BELStorage { ) -> Result, Box>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MemoryBELStorage { pub events: Vec, } @@ -153,7 +153,7 @@ impl BELStorage for SqliteBELStorage { } } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct BuildEventLog { pub storage: S, pub state: BuildState, @@ -169,101 +169,103 @@ impl BuildEventLog { let idx = self.storage.append_event(event)?; Ok(idx) } +} - pub fn schedulable_wants(&self) -> Vec { - todo!() +impl Clone for BuildEventLog { + fn clone(&self) -> Self { + Self { + storage: self.storage.clone(), + state: self.state.clone(), + } } } mod tests { - use uuid::Uuid; - use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage}; - use crate::data_build_event::Event; - use crate::{PartitionRef, WantCreateEventV1}; - use crate::build_state::BuildState; + mod sqlite_bel_storage { + use uuid::Uuid; + use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage}; + use crate::data_build_event::Event; + use crate::{PartitionRef, WantCreateEventV1}; + use crate::build_state::BuildState; - #[test] - fn test_hello() { - assert_eq!(2 + 3, 5); - } + #[test] + fn test_sqlite_append_event() { + let storage = + SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); + let state = BuildState::default(); + let mut log = BuildEventLog { storage, state }; - #[test] - fn test_sqlite_append_event() { - let storage = - SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); - let state = BuildState::default(); - let mut log = BuildEventLog { storage, state }; + let want_id = "sqlite_test_1234".to_string(); - let want_id = "sqlite_test_1234".to_string(); + // Initial state - verify storage is empty + let events = log + .storage + .list_events(0, 100) + .expect("Failed to list events"); + assert_eq!(events.len(), 0); - // Initial state - verify storage is empty - let events = log - .storage - .list_events(0, 100) - .expect("Failed to list events"); - assert_eq!(events.len(), 0); + // Verify want doesn't exist in state + assert!(log.state.get_want(&want_id).is_none()); - // Verify want doesn't exist in state - assert!(log.state.get_want(&want_id).is_none()); + // Append an event + let mut e = WantCreateEventV1::default(); + e.want_id = want_id.clone(); + e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() }); + let event_id = log + .append_event(&Event::WantCreateV1(e)) + .expect("append_event failed"); - // Append an event - let mut e = WantCreateEventV1::default(); - e.want_id = want_id.clone(); - e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() }); - let event_id = log - .append_event(&Event::WantCreateV1(e)) - .expect("append_event failed"); + // Verify event was stored + assert!(event_id > 0); - // Verify event was stored - assert!(event_id > 0); + // Verify event can be retrieved + let events = log + .storage + .list_events(0, 100) + .expect("Failed to list events"); + assert_eq!(events.len(), 1); - // Verify event can be retrieved - let events = log - .storage - .list_events(0, 100) - .expect("Failed to list events"); - assert_eq!(events.len(), 1); + let stored_event = &events[0]; + assert_eq!(stored_event.event_id, event_id); + assert!(stored_event.timestamp > 0); - let stored_event = &events[0]; - assert_eq!(stored_event.event_id, event_id); - assert!(stored_event.timestamp > 0); + // Verify the event content + if let Some(Event::WantCreateV1(want_event)) = &stored_event.event { + assert_eq!(want_event.want_id, want_id); + assert_eq!(want_event.partitions.len(), 1); + assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234"); + } else { + panic!("Expected WantCreateV1 event, got {:?}", stored_event.event); + } - // Verify the event content - if let Some(Event::WantCreateV1(want_event)) = &stored_event.event { - assert_eq!(want_event.want_id, want_id); - assert_eq!(want_event.partitions.len(), 1); - assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234"); - } else { - panic!("Expected WantCreateV1 event, got {:?}", stored_event.event); + // Verify state was updated + assert!( + log.state.get_want(&want_id).is_some(), + "want_id not found in state" + ); + assert_eq!( + log.state.get_want(&want_id) + .map(|want| want.want_id.clone()) + .expect("state.wants want_id not found"), + want_id, + "want_id not equal in state", + ); + + let mut e2 = WantCreateEventV1::default(); + e2.want_id = Uuid::new_v4().into(); + log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed"); + let mut e3 = WantCreateEventV1::default(); + e3.want_id = Uuid::new_v4().into(); + log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed"); + let mut e4 = WantCreateEventV1::default(); + e4.want_id = Uuid::new_v4().into(); + log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed"); + + let events = log + .storage + .list_events(0, 100) + .expect("Failed to list events"); + assert_eq!(events.len(), 4); } - - // Verify state was updated - assert!( - log.state.get_want(&want_id).is_some(), - "want_id not found in state" - ); - assert_eq!( - log.state.get_want(&want_id) - .map(|want| want.want_id.clone()) - .expect("state.wants want_id not found"), - want_id, - "want_id not equal in state", - ); - - let mut e2 = WantCreateEventV1::default(); - e2.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed"); - let mut e3 = WantCreateEventV1::default(); - e3.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed"); - let mut e4 = WantCreateEventV1::default(); - e4.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed"); - - let events = log - .storage - .list_events(0, 100) - .expect("Failed to list events"); - assert_eq!(events.len(), 4); } } diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 16c7516..9f17590 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,8 +1,13 @@ use crate::data_build_event::Event; use crate::util::current_timestamp; -use crate::{JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode}; -use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef}; -use rusqlite::{Connection, ToSql}; +use crate::{ + JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, + ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, + ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintDetail, WantDetail, + WantStatusCode, +}; +use rusqlite::types::FromSql; +use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::error::Error; @@ -28,7 +33,7 @@ This means no boxing or "query phase", and means we can have all state updates h and updates, which is exceptionally fast. */ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BuildState { wants: BTreeMap, taints: BTreeMap, @@ -48,7 +53,6 @@ impl Default for BuildState { } impl BuildState { - pub fn handle_event(&mut self, event: &Event) -> () { match event { Event::WantCreateV1(e) => { @@ -64,6 +68,14 @@ impl BuildState { } } + fn with_wants(self, wants: BTreeMap) -> Self { + Self { wants, ..self } + } + + fn with_partitions(self, partitions: BTreeMap) -> Self { + Self { partitions, ..self } + } + pub fn get_want(&self, want_id: &str) -> Option { self.wants.get(want_id).cloned() } @@ -120,13 +132,98 @@ impl BuildState { page_size, } } + + /** + Wants are schedulable when their partition is live and not tainted + */ + pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { + let live_details: Vec<&PartitionDetail> = want + .partitions + .iter() + .map(|pref| self.partitions.get(&pref.r#ref)) + .flatten() + .collect(); + let live: Vec = live_details + .iter() + .map(|pd| pd.r#ref.clone().expect("pref must have ref")) + .collect(); + let missing: Vec = want + .partitions + .iter() + .filter(|pref| self.partitions.get(&pref.r#ref).is_none()) + .cloned() + .collect(); + let tainted: Vec = live_details + .iter() + .filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into())) + .map(|pref| pref.r#ref.clone().unwrap()) + .collect(); + WantSchedulability { + want: want.clone(), + status: WantUpstreamStatus { + live, + tainted, + missing, + }, + } + } + + pub fn schedulable_wants(&self) -> WantsSchedulability { + WantsSchedulability( + self.wants + .values() + // Do not consider fulfilled or currently building wants in schedulability query + .filter(|w| { + w.status.clone().expect("want must have status").code + != WantStatusCode::WantSuccessful as i32 + }) + .filter(|w| { + w.status.clone().expect("want must have status").code + != WantStatusCode::WantBuilding as i32 + }) + .cloned() + .map(|w| self.want_schedulability(&w)) + .collect(), + ) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WantUpstreamStatus { + pub live: Vec, + pub tainted: Vec, + pub missing: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WantSchedulability { + pub want: WantDetail, + pub status: WantUpstreamStatus, +} + +pub struct WantsSchedulability(pub Vec); + +impl Into for WantsSchedulability { + fn into(self) -> bool { + self.0.iter().all(|w| w.is_schedulable()) + } +} + +impl WantSchedulability { + pub fn is_schedulable(&self) -> bool { + self.status.missing.len() == 0 && self.status.tainted.len() == 0 + } } fn list_state_items(map: &BTreeMap, page: u64, page_size: u64) -> Vec { // TODO when we add filtering, can we add it generically via some trait or filter object that can be provided? let start = page * page_size; let end = start + page_size; - map.values().skip(start as usize).take(end as usize).cloned().collect() + map.values() + .skip(start as usize) + .take(end as usize) + .cloned() + .collect() } mod consts { @@ -135,6 +232,103 @@ mod consts { #[cfg(test)] mod tests { + mod schedulable_wants { + use crate::build_state::BuildState; + use crate::{ + PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantDetail, + WantStatus, WantStatusCode, + }; + use std::collections::BTreeMap; + + impl WantDetail { + fn with_partitions(self, partitions: Vec) -> Self { + Self { partitions, ..self } + } + fn with_status(self, status: Option) -> Self { + Self { status, ..self } + } + } + + impl PartitionDetail { + fn with_status(self, status: Option) -> Self { + Self { status, ..self } + } + fn with_ref(self, r#ref: Option) -> Self { + Self { r#ref, ..self } + } + } + + #[test] + fn test_empty_wants_noop() { + assert_eq!(BuildState::default().schedulable_wants().0.len(), 0); + } + + // A want with satisfied upstreams (incl "none") should be schedulable + #[test] + fn test_simple_want_with_live_upstream_is_schedulable() { + // Given... + let test_partition = "test_partition"; + let state = BuildState::default() + .with_wants(BTreeMap::from([( + "foo".to_string(), + WantDetail::default() + .with_partitions(vec![test_partition.into()]) + .with_status(Some(WantStatusCode::WantIdle.into())), + )])) + .with_partitions(BTreeMap::from([( + test_partition.to_string(), + PartitionDetail::default().with_ref(Some(test_partition.into())), + )])); + + // Should... + let schedulability = state.schedulable_wants(); + let ws = schedulability.0.first().unwrap(); + assert!(ws.is_schedulable()); + } + + #[test] + fn test_simple_want_without_live_upstream_is_not_schedulable() { + // Given... + let test_partition = "test_partition"; + let state = BuildState::default().with_wants(BTreeMap::from([( + test_partition.to_string(), + WantDetail::default() + .with_partitions(vec![test_partition.into()]) + .with_status(Some(WantStatusCode::WantIdle.into())), + )])); + + // Should... + let schedulability = state.schedulable_wants(); + let ws = schedulability.0.first().unwrap(); + assert!(!ws.is_schedulable()); + } + + #[test] + #[ignore] + fn test_simple_want_with_tainted_upstream_is_not_schedulable() { + // Given... + let test_partition = "test_partition"; + let state = BuildState::default() + .with_wants(BTreeMap::from([( + "foo".to_string(), + WantDetail::default() + .with_partitions(vec![test_partition.into()]) + .with_status(Some(WantStatusCode::WantIdle.into())), + )])) + .with_partitions(BTreeMap::from([( + test_partition.to_string(), + PartitionDetail::default() + .with_ref(Some(test_partition.into())) + .with_status(Some(PartitionStatusCode::PartitionTainted.into())), + )])); + + // Should... + let schedulability = state.schedulable_wants(); + let ws = schedulability.0.first().unwrap(); + assert!(ws.is_schedulable()); + } + } + mod sqlite_build_state { mod want { use crate::build_state::BuildState; diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 5efddf4..25d913e 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,5 +1,5 @@ use crate::util::current_timestamp; -use crate::{PartitionRef, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; use crate::data_build_event::Event; impl From<&WantCreateEventV1> for WantDetail { @@ -50,3 +50,12 @@ impl From<&str> for PartitionRef { } } } + +impl From for PartitionStatus { + fn from(code: PartitionStatusCode) -> Self { + PartitionStatus { + code: code.into(), + name: code.as_str_name().to_string(), + } + } +} diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 89a6bf2..943bc6e 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,4 +1,4 @@ -use crate::build_event_log::{BELStorage, BuildEventLog}; +use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::job::JobConfiguration; use crate::job_run::JobRun; use crate::{PartitionRef, WantDetail}; @@ -17,6 +17,52 @@ struct Orchestrator { config: OrchestratorConfig, } +impl Default for Orchestrator { + fn default() -> Self { + Self { + bel: Default::default(), + job_runs: Default::default(), + config: Default::default(), + } + } +} + +impl Orchestrator { + fn copy(&self) -> Self { + Self { + bel: self.bel.clone(), + job_runs: Default::default(), + config: self.config.clone(), + } + } +} + +impl Orchestrator { + fn with_config(self, config: OrchestratorConfig) -> Self { + Self { + bel: self.bel, + job_runs: self.job_runs, + config, + } + } + + fn with_jobs(self, jobs: Vec) -> Self { + Self { + bel: self.bel, + job_runs: self.job_runs, + config: self.config.with_jobs(jobs), + } + } + + fn with_bel(self, bel: BuildEventLog) -> Self { + Self { + bel, + job_runs: self.job_runs, + config: self.config, + } + } +} + struct JobRunHandle { job_run: JobRun, bel_idx: u64, @@ -36,6 +82,14 @@ struct OrchestratorConfig { jobs: Vec, } +impl Default for OrchestratorConfig { + fn default() -> Self { + Self { + jobs: Vec::default(), + } + } +} + impl OrchestratorConfig { fn job_configuration_for_label(&self, label: &str) -> Option { self.jobs.iter().find(|job| job.label == label).cloned() @@ -44,6 +98,14 @@ impl OrchestratorConfig { fn match_job_partition(&self, pref: &PartitionRef) -> Option { self.jobs.iter().find(|job| job.matches(pref)).cloned() } + + fn with_jobs(self, jobs: Vec) -> Self { + Self { jobs } + } + + fn with_job(self, job: JobConfiguration) -> Self { + Self { jobs: vec![job] } + } } #[derive(Debug, Clone)] @@ -95,8 +157,16 @@ impl Orchestrator { /** Continuously invoked function to watch wants and schedule new jobs */ fn poll_wants(&mut self) -> Result<(), Box> { // Collect unhandled wants, group by job that handles each partition, - let grouped_wants = - Orchestrator::::group_wants(&self.config, &self.bel.schedulable_wants()); + let schedulability = self.bel.state.schedulable_wants(); + let schedulable_wants = schedulability + .0 + .iter() + .filter_map(|ws| match ws.is_schedulable() { + false => None, + true => Some(ws.want.clone()), + }) + .collect(); + let grouped_wants = Orchestrator::::group_wants(&self.config, &schedulable_wants); if !grouped_wants.want_groups.is_empty() { // All wants must be mapped to jobs that can be handled @@ -153,6 +223,13 @@ impl Orchestrator { #[cfg(test)] mod tests { + use crate::build_event_log::MemoryBELStorage; + use crate::orchestrator::Orchestrator; + + fn build_orchestrator() -> Orchestrator { + Orchestrator::default() + } + // The orchestrator needs to be able to actually execute job runs mod run_jobs { // Use case: the orchestrator should be able to execute a spawned-process job @@ -189,11 +266,17 @@ mod tests { // The orchestrator polls wants so that it can react to new wants created by users, or to wants // created by itself (for dep miss job run failures) mod poll_wants { + use crate::orchestrator::tests::build_orchestrator; + // Use case: Empty schedulable wants is a valid case, and should create no new jobs. #[test] - #[ignore] fn test_empty_wants_noop() { - todo!() + let mut orchestrator = build_orchestrator(); + assert!(orchestrator.job_runs.is_empty()); // Should init with no work to do + orchestrator + .poll_wants() + .expect("shouldn't fail to poll empty wants"); + assert!(orchestrator.job_runs.is_empty()); // Should still be empty since no work to do } // Use case: Some schedulable wants with jobs that can be matched should launch those jobs