From b8cfdade165f1c2e51c29cad84533395fad88187 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 15 Sep 2025 20:40:26 -0700 Subject: [PATCH] fmt --- databuild/build_event_log.rs | 78 ++++++++++++++++++----------- databuild/job.rs | 14 +++--- databuild/orchestrator.rs | 97 ++++++++++++++++++++++++++---------- 3 files changed, 127 insertions(+), 62 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 9d3573f..a46d079 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -22,7 +22,9 @@ pub struct MemoryBELStorage { } impl Default for MemoryBELStorage { - fn default() -> Self { Self::new() } + fn default() -> Self { + Self::new() + } } impl MemoryBELStorage { @@ -52,7 +54,11 @@ impl BELStorage for MemoryBELStorage { since_idx: u64, limit: u64, ) -> Result, Box> { - Ok(self.events.iter().cloned().filter(|e| e.timestamp > since_idx) + Ok(self + .events + .iter() + .cloned() + .filter(|e| e.timestamp > since_idx) .take(limit as usize) .collect()) } @@ -115,7 +121,7 @@ impl BELStorage for SqliteBELStorage { "SELECT event_id, timestamp, event_data FROM events WHERE timestamp > ?1 ORDER BY event_id - LIMIT ?2" + LIMIT ?2", )?; let rows = stmt.query_map([since_idx, limit], |row| { @@ -124,12 +130,13 @@ impl BELStorage for SqliteBELStorage { let event_data: Vec = row.get(2)?; // Deserialize the event using prost - let mut dbe = DataBuildEvent::decode(event_data.as_slice()) - .map_err(|e| rusqlite::Error::InvalidColumnType( + let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|e| { + rusqlite::Error::InvalidColumnType( 0, "event_data".to_string(), - rusqlite::types::Type::Blob - ))?; + rusqlite::types::Type::Blob, + ) + })?; // Update the event_id from the database dbe.event_id = event_id; @@ -180,9 +187,13 @@ impl BuildEventLog { Event::JobRunCancelV1(e) => {} Event::JobRunMissingDepsV1(e) => {} Event::WantCreateV1(e) => { - state - .wants - .insert(e.want_id.clone(), WantDetail { want_id: e.want_id, refs: e.partitions }); + state.wants.insert( + e.want_id.clone(), + WantDetail { + want_id: e.want_id, + refs: e.partitions, + }, + ); } Event::WantCancelV1(e) => {} Event::TaintCreateV1(e) => {} @@ -252,13 +263,17 @@ mod tests { #[test] fn test_sqlite_append_event() { - let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); + let storage = + SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); let mut log = BuildEventLog::new(storage); let want_id = "sqlite_test_1234".to_string(); // Initial state - verify storage is empty - let events = log.storage.list_events(0, 100).expect("Failed to list events"); + let events = log + .storage + .list_events(0, 100) + .expect("Failed to list events"); assert_eq!(events.len(), 0); // Verify want doesn't exist in state @@ -268,26 +283,30 @@ mod tests { } // Append an event - let event_id = log.append_event(Event::WantCreateV1(WantCreateEventV1 { - want_id: want_id.clone(), - root_want_id: "sqlite_root_123".to_string(), - parent_want_id: "sqlite_parent_123".to_string(), - partitions: vec![PartitionRef { - r#ref: "sqlite_partition_1234".to_string(), - }], - data_timestamp: 0, - ttl_seconds: 1, - sla_seconds: 1, - source: None, - comment: None, - })) - .expect("append_event failed"); + let event_id = log + .append_event(Event::WantCreateV1(WantCreateEventV1 { + want_id: want_id.clone(), + root_want_id: "sqlite_root_123".to_string(), + parent_want_id: "sqlite_parent_123".to_string(), + partitions: vec![PartitionRef { + r#ref: "sqlite_partition_1234".to_string(), + }], + data_timestamp: 0, + ttl_seconds: 1, + sla_seconds: 1, + source: None, + comment: None, + })) + .expect("append_event failed"); // Verify event was stored assert!(event_id > 0); // Verify event can be retrieved - let events = log.storage.list_events(0, 100).expect("Failed to list events"); + let events = log + .storage + .list_events(0, 100) + .expect("Failed to list events"); assert_eq!(events.len(), 1); let stored_event = &events[0]; @@ -307,7 +326,10 @@ mod tests { // Verify state was updated let state = log.state.read().expect("couldn't take read lock"); - assert!(state.wants.get(&want_id).is_some(), "want_id not found in state"); + assert!( + state.wants.get(&want_id).is_some(), + "want_id not found in state" + ); assert_eq!( state .wants diff --git a/databuild/job.rs b/databuild/job.rs index 0ea8add..4108e16 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,8 +1,7 @@ -use std::error::Error; -use regex::Regex; use crate::job_run::JobRun; use crate::{PartitionRef, WantDetail}; - +use regex::Regex; +use std::error::Error; #[derive(Debug, Clone)] pub struct JobConfiguration { @@ -14,14 +13,15 @@ pub struct JobConfiguration { impl JobConfiguration { /** Launch job to build the partitions specified by the provided wants. */ pub fn spawn(&self, wants: Vec) -> Result> { - let wanted_refs: Vec = wants.iter().flat_map(|want| want.refs.clone()).collect(); + let wanted_refs: Vec = + wants.iter().flat_map(|want| want.refs.clone()).collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); JobRun::spawn(self.entrypoint.clone(), args) } - + pub fn matches(&self, refs: &PartitionRef) -> bool { - let regex = Regex::new(&self.pattern) - .expect(&format!("Invalid regex pattern: {}", self.pattern)); + let regex = + Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern)); regex.is_match(&refs.r#ref) } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index a49f1fa..e07d7c9 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,17 +1,16 @@ -use std::collections::HashMap; -use std::error::Error; -use std::fmt::Debug; use crate::build_event_log::{BELStorage, BuildEventLog}; use crate::job::JobConfiguration; use crate::job_run::JobRun; use crate::{PartitionRef, WantDetail}; +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Debug; /** Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads + the visitor pattern to monitor job exec progress and liveness, and adds */ - struct Orchestrator { bel: BuildEventLog, job_runs: Vec, @@ -24,7 +23,12 @@ struct JobRunHandle { } impl From for JobRunHandle { - fn from(job_run: JobRun) -> Self { Self { job_run, bel_idx: 0 } } + fn from(job_run: JobRun) -> Self { + Self { + job_run, + bel_idx: 0, + } + } } #[derive(Debug, Clone)] @@ -55,17 +59,31 @@ struct GroupedWants { } impl Orchestrator { - fn new(storage: B, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage), job_runs: Vec::new(), config } } + fn new(storage: B, config: OrchestratorConfig) -> Self { + Self { + bel: BuildEventLog::new(storage), + job_runs: Vec::new(), + config, + } + } /** Continuously invoked function to watch job run status */ fn poll_jobs(&mut self) -> Result<(), Box> { // Visit existing jobs, remove completed self.job_runs.retain_mut(|jr| { // Append emitted events - let events = jr.job_run.visit(jr.bel_idx.clone()).expect("Job visit failed"); - events.iter().filter_map(|event| event.event.clone()).for_each(|event| { - self.bel.append_event(event.clone()).expect("Failed to append event"); - }); + let events = jr + .job_run + .visit(jr.bel_idx.clone()) + .expect("Job visit failed"); + events + .iter() + .filter_map(|event| event.event.clone()) + .for_each(|event| { + self.bel + .append_event(event.clone()) + .expect("Failed to append event"); + }); // Retain job run if it doesn't yet have an exit code (still running) jr.job_run.exit_status().is_none() @@ -77,12 +95,17 @@ impl Orchestrator { /** Continuously invoked function to watch wants and schedule new jobs */ fn poll_wants(&mut self) -> Result<(), Box> { // Collect unhandled wants, group by job that handles each partition, - let grouped_wants = Orchestrator::::group_wants(&self.config, &self.bel.schedulable_wants()); + let grouped_wants = + Orchestrator::::group_wants(&self.config, &self.bel.schedulable_wants()); if !grouped_wants.want_groups.is_empty() { // All wants must be mapped to jobs that can be handled // TODO we probably want to handle this gracefully in the near future - Err(format!("Unable to map following wants: {:?}", &grouped_wants.want_groups).into()) + Err(format!( + "Unable to map following wants: {:?}", + &grouped_wants.want_groups + ) + .into()) } else { for wg in grouped_wants.want_groups { let job_run = wg.job.spawn(wg.wants)?; @@ -106,12 +129,15 @@ impl Orchestrator { }); }); GroupedWants { - want_groups: want_groups.iter().map(|(k, v)| { - WantGroup { - job: config.job_configuration_for_label(k).expect(&format!("Job configuration not found for label `{}`", k)), + want_groups: want_groups + .iter() + .map(|(k, v)| WantGroup { + job: config + .job_configuration_for_label(k) + .expect(&format!("Job configuration not found for label `{}`", k)), wants: v.to_owned(), - } - }).collect(), + }) + .collect(), unhandled_wants, } } @@ -129,8 +155,8 @@ impl Orchestrator { mod tests { mod want_group { use super::super::*; - use crate::{PartitionRef, WantDetail}; use crate::build_event_log::MemoryBELStorage; + use crate::{PartitionRef, WantDetail}; fn create_job_config(label: &str, pattern: &str) -> JobConfiguration { JobConfiguration { @@ -143,9 +169,12 @@ mod tests { fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail { WantDetail { want_id: want_id.to_string(), - refs: partition_refs.iter().map(|r| PartitionRef { - r#ref: r.to_string(), - }).collect(), + refs: partition_refs + .iter() + .map(|r| PartitionRef { + r#ref: r.to_string(), + }) + .collect(), } } @@ -163,7 +192,9 @@ mod tests { #[test] fn test_group_wants_one_want_matches_job() { let job_config = create_job_config("test_job", "partition.*"); - let config = OrchestratorConfig { jobs: vec![job_config.clone()] }; + let config = OrchestratorConfig { + jobs: vec![job_config.clone()], + }; let want = create_want_detail("want1", vec!["partition1"]); let wants = vec![want.clone()]; @@ -179,7 +210,9 @@ mod tests { #[test] fn test_group_wants_one_unmatching_want() { let job_config = create_job_config("test_job", "^test_pattern$"); - let config = OrchestratorConfig { jobs: vec![job_config] }; + let config = OrchestratorConfig { + jobs: vec![job_config], + }; let want = create_want_detail("want1", vec!["different_partition"]); let wants = vec![want.clone()]; @@ -194,7 +227,9 @@ mod tests { fn test_group_wants_multiple_wants_different_jobs() { let job_config1 = create_job_config("job1", "pattern1.*"); let job_config2 = create_job_config("job2", "pattern2.*"); - let config = OrchestratorConfig { jobs: vec![job_config1, job_config2] }; + let config = OrchestratorConfig { + jobs: vec![job_config1, job_config2], + }; let want1 = create_want_detail("want1", vec!["pattern1_partition"]); let want2 = create_want_detail("want2", vec!["pattern1_other"]); @@ -207,12 +242,20 @@ mod tests { assert_eq!(result.want_groups.len(), 2); // Find job1 group - let job1_group = result.want_groups.iter().find(|wg| wg.job.label == "job1").unwrap(); + let job1_group = result + .want_groups + .iter() + .find(|wg| wg.job.label == "job1") + .unwrap(); assert_eq!(job1_group.wants.len(), 2); // Find job2 group - let job2_group = result.want_groups.iter().find(|wg| wg.job.label == "job2").unwrap(); + let job2_group = result + .want_groups + .iter() + .find(|wg| wg.job.label == "job2") + .unwrap(); assert_eq!(job2_group.wants.len(), 1); } } -} \ No newline at end of file +}