partition schedulability

This commit is contained in:
Stuart Axelbrooke 2025-10-13 13:29:04 -07:00
parent ac567240ea
commit bc61d8f530
4 changed files with 382 additions and 94 deletions

View file

@ -17,7 +17,7 @@ pub trait BELStorage {
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemoryBELStorage {
pub events: Vec<DataBuildEvent>,
}
@ -153,7 +153,7 @@ impl BELStorage for SqliteBELStorage {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
@ -169,101 +169,103 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
let idx = self.storage.append_event(event)?;
Ok(idx)
}
}
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
todo!()
impl Clone for BuildEventLog<MemoryBELStorage> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
state: self.state.clone(),
}
}
}
mod tests {
use uuid::Uuid;
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::BuildState;
mod sqlite_bel_storage {
use uuid::Uuid;
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::BuildState;
#[test]
fn test_hello() {
assert_eq!(2 + 3, 5);
}
#[test]
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
#[test]
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
let want_id = "sqlite_test_1234".to_string();
let want_id = "sqlite_test_1234".to_string();
// Initial state - verify storage is empty
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0);
// Initial state - verify storage is empty
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0);
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).is_none());
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).is_none());
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
// Verify event was stored
assert!(event_id > 0);
// Verify event was stored
assert!(event_id > 0);
// Verify event can be retrieved
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1);
// Verify event can be retrieved
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1);
let stored_event = &events[0];
assert_eq!(stored_event.event_id, event_id);
assert!(stored_event.timestamp > 0);
let stored_event = &events[0];
assert_eq!(stored_event.event_id, event_id);
assert!(stored_event.timestamp > 0);
// Verify the event content
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
assert_eq!(want_event.want_id, want_id);
assert_eq!(want_event.partitions.len(), 1);
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
} else {
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
}
// Verify the event content
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
assert_eq!(want_event.want_id, want_id);
assert_eq!(want_event.partitions.len(), 1);
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
} else {
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
// Verify state was updated
assert!(
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
"want_id not equal in state",
);
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 4);
}
// Verify state was updated
assert!(
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
"want_id not equal in state",
);
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 4);
}
}

View file

@ -1,8 +1,13 @@
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::{Connection, ToSql};
use crate::{
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintDetail, WantDetail,
WantStatusCode,
};
use rusqlite::types::FromSql;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error;
@ -28,7 +33,7 @@ This means no boxing or "query phase", and means we can have all state updates h
and updates, which is exceptionally fast.
*/
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BuildState {
wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>,
@ -48,7 +53,6 @@ impl Default for BuildState {
}
impl BuildState {
pub fn handle_event(&mut self, event: &Event) -> () {
match event {
Event::WantCreateV1(e) => {
@ -64,6 +68,14 @@ impl BuildState {
}
}
fn with_wants(self, wants: BTreeMap<String, WantDetail>) -> Self {
Self { wants, ..self }
}
fn with_partitions(self, partitions: BTreeMap<String, PartitionDetail>) -> Self {
Self { partitions, ..self }
}
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).cloned()
}
@ -120,13 +132,98 @@ impl BuildState {
page_size,
}
}
/**
Wants are schedulable when their partition is live and not tainted
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
let live_details: Vec<&PartitionDetail> = want
.partitions
.iter()
.map(|pref| self.partitions.get(&pref.r#ref))
.flatten()
.collect();
let live: Vec<PartitionRef> = live_details
.iter()
.map(|pd| pd.r#ref.clone().expect("pref must have ref"))
.collect();
let missing: Vec<PartitionRef> = want
.partitions
.iter()
.filter(|pref| self.partitions.get(&pref.r#ref).is_none())
.cloned()
.collect();
let tainted: Vec<PartitionRef> = live_details
.iter()
.filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into()))
.map(|pref| pref.r#ref.clone().unwrap())
.collect();
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {
live,
tainted,
missing,
},
}
}
pub fn schedulable_wants(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
// Do not consider fulfilled or currently building wants in schedulability query
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantSuccessful as i32
})
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantBuilding as i32
})
.cloned()
.map(|w| self.want_schedulability(&w))
.collect(),
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<PartitionRef>,
pub tainted: Vec<PartitionRef>,
pub missing: Vec<PartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantSchedulability {
pub want: WantDetail,
pub status: WantUpstreamStatus,
}
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl Into<bool> for WantsSchedulability {
fn into(self) -> bool {
self.0.iter().all(|w| w.is_schedulable())
}
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
self.status.missing.len() == 0 && self.status.tainted.len() == 0
}
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values().skip(start as usize).take(end as usize).cloned().collect()
map.values()
.skip(start as usize)
.take(end as usize)
.cloned()
.collect()
}
mod consts {
@ -135,6 +232,103 @@ mod consts {
#[cfg(test)]
mod tests {
mod schedulable_wants {
use crate::build_state::BuildState;
use crate::{
PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantDetail,
WantStatus, WantStatusCode,
};
use std::collections::BTreeMap;
impl WantDetail {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
}
impl PartitionDetail {
fn with_status(self, status: Option<PartitionStatus>) -> Self {
Self { status, ..self }
}
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
Self { r#ref, ..self }
}
}
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().schedulable_wants().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
#[test]
fn test_simple_want_with_live_upstream_is_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default().with_ref(Some(test_partition.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
#[test]
fn test_simple_want_without_live_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default().with_wants(BTreeMap::from([(
test_partition.to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(!ws.is_schedulable());
}
#[test]
#[ignore]
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default()
.with_ref(Some(test_partition.into()))
.with_status(Some(PartitionStatusCode::PartitionTainted.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}
mod sqlite_build_state {
mod want {
use crate::build_state::BuildState;

View file

@ -1,5 +1,5 @@
use crate::util::current_timestamp;
use crate::{PartitionRef, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::{PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::data_build_event::Event;
impl From<&WantCreateEventV1> for WantDetail {
@ -50,3 +50,12 @@ impl From<&str> for PartitionRef {
}
}
}
impl From<PartitionStatusCode> for PartitionStatus {
fn from(code: PartitionStatusCode) -> Self {
PartitionStatus {
code: code.into(),
name: code.as_str_name().to_string(),
}
}
}

View file

@ -1,4 +1,4 @@
use crate::build_event_log::{BELStorage, BuildEventLog};
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::job::JobConfiguration;
use crate::job_run::JobRun;
use crate::{PartitionRef, WantDetail};
@ -17,6 +17,52 @@ struct Orchestrator<S: BELStorage + Debug> {
config: OrchestratorConfig,
}
impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self {
Self {
bel: Default::default(),
job_runs: Default::default(),
config: Default::default(),
}
}
}
impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self {
Self {
bel: self.bel.clone(),
job_runs: Default::default(),
config: self.config.clone(),
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn with_config(self, config: OrchestratorConfig) -> Self {
Self {
bel: self.bel,
job_runs: self.job_runs,
config,
}
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self {
bel: self.bel,
job_runs: self.job_runs,
config: self.config.with_jobs(jobs),
}
}
fn with_bel(self, bel: BuildEventLog<S>) -> Self {
Self {
bel,
job_runs: self.job_runs,
config: self.config,
}
}
}
struct JobRunHandle {
job_run: JobRun,
bel_idx: u64,
@ -36,6 +82,14 @@ struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
fn default() -> Self {
Self {
jobs: Vec::default(),
}
}
}
impl OrchestratorConfig {
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.label == label).cloned()
@ -44,6 +98,14 @@ impl OrchestratorConfig {
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.matches(pref)).cloned()
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self { jobs }
}
fn with_job(self, job: JobConfiguration) -> Self {
Self { jobs: vec![job] }
}
}
#[derive(Debug, Clone)]
@ -95,8 +157,16 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition,
let grouped_wants =
Orchestrator::<S>::group_wants(&self.config, &self.bel.schedulable_wants());
let schedulability = self.bel.state.schedulable_wants();
let schedulable_wants = schedulability
.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
if !grouped_wants.want_groups.is_empty() {
// All wants must be mapped to jobs that can be handled
@ -153,6 +223,13 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
mod tests {
use crate::build_event_log::MemoryBELStorage;
use crate::orchestrator::Orchestrator;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
Orchestrator::default()
}
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
@ -189,11 +266,17 @@ mod tests {
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::orchestrator::tests::build_orchestrator;
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
#[ignore]
fn test_empty_wants_noop() {
todo!()
let mut orchestrator = build_orchestrator();
assert!(orchestrator.job_runs.is_empty()); // Should init with no work to do
orchestrator
.poll_wants()
.expect("shouldn't fail to poll empty wants");
assert!(orchestrator.job_runs.is_empty()); // Should still be empty since no work to do
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs