fmt
Some checks failed
/ setup (push) Has been cancelled

This commit is contained in:
Stuart Axelbrooke 2025-09-15 20:40:26 -07:00
parent 5484363e52
commit b8cfdade16
3 changed files with 127 additions and 62 deletions

View file

@ -22,7 +22,9 @@ pub struct MemoryBELStorage {
} }
impl Default for MemoryBELStorage { impl Default for MemoryBELStorage {
fn default() -> Self { Self::new() } fn default() -> Self {
Self::new()
}
} }
impl MemoryBELStorage { impl MemoryBELStorage {
@ -52,7 +54,11 @@ impl BELStorage for MemoryBELStorage {
since_idx: u64, since_idx: u64,
limit: u64, limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> { ) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
Ok(self.events.iter().cloned().filter(|e| e.timestamp > since_idx) Ok(self
.events
.iter()
.cloned()
.filter(|e| e.timestamp > since_idx)
.take(limit as usize) .take(limit as usize)
.collect()) .collect())
} }
@ -115,7 +121,7 @@ impl BELStorage for SqliteBELStorage {
"SELECT event_id, timestamp, event_data FROM events "SELECT event_id, timestamp, event_data FROM events
WHERE timestamp > ?1 WHERE timestamp > ?1
ORDER BY event_id ORDER BY event_id
LIMIT ?2" LIMIT ?2",
)?; )?;
let rows = stmt.query_map([since_idx, limit], |row| { let rows = stmt.query_map([since_idx, limit], |row| {
@ -124,12 +130,13 @@ impl BELStorage for SqliteBELStorage {
let event_data: Vec<u8> = row.get(2)?; let event_data: Vec<u8> = row.get(2)?;
// Deserialize the event using prost // Deserialize the event using prost
let mut dbe = DataBuildEvent::decode(event_data.as_slice()) let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|e| {
.map_err(|e| rusqlite::Error::InvalidColumnType( rusqlite::Error::InvalidColumnType(
0, 0,
"event_data".to_string(), "event_data".to_string(),
rusqlite::types::Type::Blob rusqlite::types::Type::Blob,
))?; )
})?;
// Update the event_id from the database // Update the event_id from the database
dbe.event_id = event_id; dbe.event_id = event_id;
@ -180,9 +187,13 @@ impl<B: BELStorage + Debug> BuildEventLog<B> {
Event::JobRunCancelV1(e) => {} Event::JobRunCancelV1(e) => {}
Event::JobRunMissingDepsV1(e) => {} Event::JobRunMissingDepsV1(e) => {}
Event::WantCreateV1(e) => { Event::WantCreateV1(e) => {
state state.wants.insert(
.wants e.want_id.clone(),
.insert(e.want_id.clone(), WantDetail { want_id: e.want_id, refs: e.partitions }); WantDetail {
want_id: e.want_id,
refs: e.partitions,
},
);
} }
Event::WantCancelV1(e) => {} Event::WantCancelV1(e) => {}
Event::TaintCreateV1(e) => {} Event::TaintCreateV1(e) => {}
@ -252,13 +263,17 @@ mod tests {
#[test] #[test]
fn test_sqlite_append_event() { fn test_sqlite_append_event() {
let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let mut log = BuildEventLog::new(storage); let mut log = BuildEventLog::new(storage);
let want_id = "sqlite_test_1234".to_string(); let want_id = "sqlite_test_1234".to_string();
// Initial state - verify storage is empty // Initial state - verify storage is empty
let events = log.storage.list_events(0, 100).expect("Failed to list events"); let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0); assert_eq!(events.len(), 0);
// Verify want doesn't exist in state // Verify want doesn't exist in state
@ -268,26 +283,30 @@ mod tests {
} }
// Append an event // Append an event
let event_id = log.append_event(Event::WantCreateV1(WantCreateEventV1 { let event_id = log
want_id: want_id.clone(), .append_event(Event::WantCreateV1(WantCreateEventV1 {
root_want_id: "sqlite_root_123".to_string(), want_id: want_id.clone(),
parent_want_id: "sqlite_parent_123".to_string(), root_want_id: "sqlite_root_123".to_string(),
partitions: vec![PartitionRef { parent_want_id: "sqlite_parent_123".to_string(),
r#ref: "sqlite_partition_1234".to_string(), partitions: vec![PartitionRef {
}], r#ref: "sqlite_partition_1234".to_string(),
data_timestamp: 0, }],
ttl_seconds: 1, data_timestamp: 0,
sla_seconds: 1, ttl_seconds: 1,
source: None, sla_seconds: 1,
comment: None, source: None,
})) comment: None,
.expect("append_event failed"); }))
.expect("append_event failed");
// Verify event was stored // Verify event was stored
assert!(event_id > 0); assert!(event_id > 0);
// Verify event can be retrieved // Verify event can be retrieved
let events = log.storage.list_events(0, 100).expect("Failed to list events"); let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1); assert_eq!(events.len(), 1);
let stored_event = &events[0]; let stored_event = &events[0];
@ -307,7 +326,10 @@ mod tests {
// Verify state was updated // Verify state was updated
let state = log.state.read().expect("couldn't take read lock"); let state = log.state.read().expect("couldn't take read lock");
assert!(state.wants.get(&want_id).is_some(), "want_id not found in state"); assert!(
state.wants.get(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!( assert_eq!(
state state
.wants .wants

View file

@ -1,8 +1,7 @@
use std::error::Error;
use regex::Regex;
use crate::job_run::JobRun; use crate::job_run::JobRun;
use crate::{PartitionRef, WantDetail}; use crate::{PartitionRef, WantDetail};
use regex::Regex;
use std::error::Error;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct JobConfiguration { pub struct JobConfiguration {
@ -14,14 +13,15 @@ pub struct JobConfiguration {
impl JobConfiguration { impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */ /** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> { pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
let wanted_refs: Vec<PartitionRef> = wants.iter().flat_map(|want| want.refs.clone()).collect(); let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.refs.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
JobRun::spawn(self.entrypoint.clone(), args) JobRun::spawn(self.entrypoint.clone(), args)
} }
pub fn matches(&self, refs: &PartitionRef) -> bool { pub fn matches(&self, refs: &PartitionRef) -> bool {
let regex = Regex::new(&self.pattern) let regex =
.expect(&format!("Invalid regex pattern: {}", self.pattern)); Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern));
regex.is_match(&refs.r#ref) regex.is_match(&refs.r#ref)
} }
} }

View file

@ -1,17 +1,16 @@
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use crate::build_event_log::{BELStorage, BuildEventLog}; use crate::build_event_log::{BELStorage, BuildEventLog};
use crate::job::JobConfiguration; use crate::job::JobConfiguration;
use crate::job_run::JobRun; use crate::job_run::JobRun;
use crate::{PartitionRef, WantDetail}; use crate::{PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
/** /**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads + Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
the visitor pattern to monitor job exec progress and liveness, and adds the visitor pattern to monitor job exec progress and liveness, and adds
*/ */
struct Orchestrator<B: BELStorage + Debug> { struct Orchestrator<B: BELStorage + Debug> {
bel: BuildEventLog<B>, bel: BuildEventLog<B>,
job_runs: Vec<JobRunHandle>, job_runs: Vec<JobRunHandle>,
@ -24,7 +23,12 @@ struct JobRunHandle {
} }
impl From<JobRun> for JobRunHandle { impl From<JobRun> for JobRunHandle {
fn from(job_run: JobRun) -> Self { Self { job_run, bel_idx: 0 } } fn from(job_run: JobRun) -> Self {
Self {
job_run,
bel_idx: 0,
}
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -55,17 +59,31 @@ struct GroupedWants {
} }
impl<B: BELStorage + Debug> Orchestrator<B> { impl<B: BELStorage + Debug> Orchestrator<B> {
fn new(storage: B, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage), job_runs: Vec::new(), config } } fn new(storage: B, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage),
job_runs: Vec::new(),
config,
}
}
/** Continuously invoked function to watch job run status */ /** Continuously invoked function to watch job run status */
fn poll_jobs(&mut self) -> Result<(), Box<dyn Error>> { fn poll_jobs(&mut self) -> Result<(), Box<dyn Error>> {
// Visit existing jobs, remove completed // Visit existing jobs, remove completed
self.job_runs.retain_mut(|jr| { self.job_runs.retain_mut(|jr| {
// Append emitted events // Append emitted events
let events = jr.job_run.visit(jr.bel_idx.clone()).expect("Job visit failed"); let events = jr
events.iter().filter_map(|event| event.event.clone()).for_each(|event| { .job_run
self.bel.append_event(event.clone()).expect("Failed to append event"); .visit(jr.bel_idx.clone())
}); .expect("Job visit failed");
events
.iter()
.filter_map(|event| event.event.clone())
.for_each(|event| {
self.bel
.append_event(event.clone())
.expect("Failed to append event");
});
// Retain job run if it doesn't yet have an exit code (still running) // Retain job run if it doesn't yet have an exit code (still running)
jr.job_run.exit_status().is_none() jr.job_run.exit_status().is_none()
@ -77,12 +95,17 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
/** Continuously invoked function to watch wants and schedule new jobs */ /** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> { fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition, // Collect unhandled wants, group by job that handles each partition,
let grouped_wants = Orchestrator::<B>::group_wants(&self.config, &self.bel.schedulable_wants()); let grouped_wants =
Orchestrator::<B>::group_wants(&self.config, &self.bel.schedulable_wants());
if !grouped_wants.want_groups.is_empty() { if !grouped_wants.want_groups.is_empty() {
// All wants must be mapped to jobs that can be handled // All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future // TODO we probably want to handle this gracefully in the near future
Err(format!("Unable to map following wants: {:?}", &grouped_wants.want_groups).into()) Err(format!(
"Unable to map following wants: {:?}",
&grouped_wants.want_groups
)
.into())
} else { } else {
for wg in grouped_wants.want_groups { for wg in grouped_wants.want_groups {
let job_run = wg.job.spawn(wg.wants)?; let job_run = wg.job.spawn(wg.wants)?;
@ -106,12 +129,15 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
}); });
}); });
GroupedWants { GroupedWants {
want_groups: want_groups.iter().map(|(k, v)| { want_groups: want_groups
WantGroup { .iter()
job: config.job_configuration_for_label(k).expect(&format!("Job configuration not found for label `{}`", k)), .map(|(k, v)| WantGroup {
job: config
.job_configuration_for_label(k)
.expect(&format!("Job configuration not found for label `{}`", k)),
wants: v.to_owned(), wants: v.to_owned(),
} })
}).collect(), .collect(),
unhandled_wants, unhandled_wants,
} }
} }
@ -129,8 +155,8 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
mod tests { mod tests {
mod want_group { mod want_group {
use super::super::*; use super::super::*;
use crate::{PartitionRef, WantDetail};
use crate::build_event_log::MemoryBELStorage; use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail};
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration { fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration { JobConfiguration {
@ -143,9 +169,12 @@ mod tests {
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail { fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
WantDetail { WantDetail {
want_id: want_id.to_string(), want_id: want_id.to_string(),
refs: partition_refs.iter().map(|r| PartitionRef { refs: partition_refs
r#ref: r.to_string(), .iter()
}).collect(), .map(|r| PartitionRef {
r#ref: r.to_string(),
})
.collect(),
} }
} }
@ -163,7 +192,9 @@ mod tests {
#[test] #[test]
fn test_group_wants_one_want_matches_job() { fn test_group_wants_one_want_matches_job() {
let job_config = create_job_config("test_job", "partition.*"); let job_config = create_job_config("test_job", "partition.*");
let config = OrchestratorConfig { jobs: vec![job_config.clone()] }; let config = OrchestratorConfig {
jobs: vec![job_config.clone()],
};
let want = create_want_detail("want1", vec!["partition1"]); let want = create_want_detail("want1", vec!["partition1"]);
let wants = vec![want.clone()]; let wants = vec![want.clone()];
@ -179,7 +210,9 @@ mod tests {
#[test] #[test]
fn test_group_wants_one_unmatching_want() { fn test_group_wants_one_unmatching_want() {
let job_config = create_job_config("test_job", "^test_pattern$"); let job_config = create_job_config("test_job", "^test_pattern$");
let config = OrchestratorConfig { jobs: vec![job_config] }; let config = OrchestratorConfig {
jobs: vec![job_config],
};
let want = create_want_detail("want1", vec!["different_partition"]); let want = create_want_detail("want1", vec!["different_partition"]);
let wants = vec![want.clone()]; let wants = vec![want.clone()];
@ -194,7 +227,9 @@ mod tests {
fn test_group_wants_multiple_wants_different_jobs() { fn test_group_wants_multiple_wants_different_jobs() {
let job_config1 = create_job_config("job1", "pattern1.*"); let job_config1 = create_job_config("job1", "pattern1.*");
let job_config2 = create_job_config("job2", "pattern2.*"); let job_config2 = create_job_config("job2", "pattern2.*");
let config = OrchestratorConfig { jobs: vec![job_config1, job_config2] }; let config = OrchestratorConfig {
jobs: vec![job_config1, job_config2],
};
let want1 = create_want_detail("want1", vec!["pattern1_partition"]); let want1 = create_want_detail("want1", vec!["pattern1_partition"]);
let want2 = create_want_detail("want2", vec!["pattern1_other"]); let want2 = create_want_detail("want2", vec!["pattern1_other"]);
@ -207,12 +242,20 @@ mod tests {
assert_eq!(result.want_groups.len(), 2); assert_eq!(result.want_groups.len(), 2);
// Find job1 group // Find job1 group
let job1_group = result.want_groups.iter().find(|wg| wg.job.label == "job1").unwrap(); let job1_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job1")
.unwrap();
assert_eq!(job1_group.wants.len(), 2); assert_eq!(job1_group.wants.len(), 2);
// Find job2 group // Find job2 group
let job2_group = result.want_groups.iter().find(|wg| wg.job.label == "job2").unwrap(); let job2_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job2")
.unwrap();
assert_eq!(job2_group.wants.len(), 1); assert_eq!(job2_group.wants.len(), 1);
} }
} }
} }