diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index 7ae21b7..2c9cc73 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -21,7 +21,10 @@ rust_library( name = "databuild", srcs = [ "build_event_log.rs", + "job.rs", + "job_run.rs", "lib.rs", + "orchestrator.rs", ":generate_databuild_rust", ], edition = "2021", diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index a0aaee6..9d3573f 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -3,10 +3,11 @@ use crate::{BuildState, DataBuildEvent, WantDetail}; use prost::Message; use rusqlite::Connection; use std::error::Error; +use std::fmt::Debug; use std::sync::{Arc, RwLock}; use std::time::{SystemTime, UNIX_EPOCH}; -trait BELStorage { +pub trait BELStorage { fn append_event(&mut self, event: Event) -> Result>; fn list_events( &self, @@ -15,12 +16,17 @@ trait BELStorage { ) -> Result, Box>; } -struct MemoryBELStorage { +#[derive(Debug)] +pub struct MemoryBELStorage { events: Vec, } +impl Default for MemoryBELStorage { + fn default() -> Self { Self::new() } +} + impl MemoryBELStorage { - fn create() -> MemoryBELStorage { + pub fn new() -> MemoryBELStorage { MemoryBELStorage { events: vec![] } } } @@ -52,6 +58,7 @@ impl BELStorage for MemoryBELStorage { } } +#[derive(Debug)] struct SqliteBELStorage { connection: Connection, } @@ -142,20 +149,21 @@ impl BELStorage for SqliteBELStorage { } } -struct BuildEventLog { - storage: B, - state: Arc>, +#[derive(Debug)] +pub struct BuildEventLog { + pub storage: B, + pub state: Arc>, } -impl BuildEventLog { - fn create(storage: B) -> BuildEventLog { +impl BuildEventLog { + pub fn new(storage: B) -> BuildEventLog { BuildEventLog { storage, state: Arc::new(Default::default()), } } - fn append_event(&mut self, event: Event) -> Result> { + pub fn append_event(&mut self, event: Event) -> Result> { self.reduce(event.clone())?; let idx = self.storage.append_event(event)?; Ok(idx) @@ -174,7 +182,7 @@ impl BuildEventLog { Event::WantCreateV1(e) => { state .wants - .insert(e.want_id.clone(), WantDetail { want_id: e.want_id }); + .insert(e.want_id.clone(), WantDetail { want_id: e.want_id, refs: e.partitions }); } Event::WantCancelV1(e) => {} Event::TaintCreateV1(e) => {} @@ -183,6 +191,10 @@ impl BuildEventLog { Ok(()) } + + pub fn schedulable_wants(&self) -> Vec { + todo!() + } } mod tests { @@ -197,8 +209,8 @@ mod tests { #[test] fn test_append_event() { - let storage = MemoryBELStorage::create(); - let mut log = BuildEventLog::create(storage); + let storage = MemoryBELStorage::new(); + let mut log = BuildEventLog::new(storage); // Initial state assert_eq!(log.storage.events.len(), 0); let want_id = "1234".to_string(); @@ -241,7 +253,7 @@ mod tests { #[test] fn test_sqlite_append_event() { let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); - let mut log = BuildEventLog::create(storage); + let mut log = BuildEventLog::new(storage); let want_id = "sqlite_test_1234".to_string(); diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 649603b..08213f8 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -133,6 +133,7 @@ message BuildState { message WantDetail { string want_id = 1; + repeated PartitionRef refs = 2; // TODO } diff --git a/databuild/job.rs b/databuild/job.rs new file mode 100644 index 0000000..c1ce6f5 --- /dev/null +++ b/databuild/job.rs @@ -0,0 +1,24 @@ +use std::error::Error; +use crate::job_run::JobRun; +use crate::{PartitionRef, WantDetail}; + + +#[derive(Debug, Clone)] +pub struct JobConfiguration { + pub label: String, + pub pattern: String, + pub entrypoint: String, +} + +impl JobConfiguration { + /** Launch job to build the partitions specified by the provided wants. */ + pub fn spawn(&self, wants: Vec) -> Result> { + let wanted_refs: Vec = wants.iter().flat_map(|want| want.refs.clone()).collect(); + let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); + JobRun::spawn(self.entrypoint.clone(), args) + } + + pub fn matches(&self, refs: &PartitionRef) -> bool { + todo!() + } +} diff --git a/databuild/job_run.rs b/databuild/job_run.rs new file mode 100644 index 0000000..a776b0b --- /dev/null +++ b/databuild/job_run.rs @@ -0,0 +1,102 @@ +use crate::build_event_log::{BELStorage, MemoryBELStorage}; +use crate::data_build_event::Event; +use crate::{DataBuildEvent, JobRunHeartbeatEventV1}; +use std::error::Error; +use std::io::{BufRead, BufReader}; +use std::process::{Child, Command, Stdio}; +use uuid::Uuid; + +#[derive(Debug)] +pub struct JobRun { + job_run_id: Uuid, + events: MemoryBELStorage, + child: Child, + unhandled_lines: Vec, +} + +const EVENT_SIZE_LIMIT: u64 = 1000000; + +impl JobRun { + pub fn spawn(command: String, args: Vec) -> Result> { + Ok(JobRun { + job_run_id: Default::default(), + events: Default::default(), + child: Command::new(command).args(args).stdout(Stdio::piped()) + .spawn()?, + unhandled_lines: Default::default(), + }) + } + + pub fn visit(&mut self, since_idx: u64) -> Result, Box> { + // Collect new lines from child process + let stdout = self.child.stdout.take().expect("stdout not piped"); + let reader = BufReader::new(stdout); + for line in reader.lines() { + self.unhandled_lines.push(line?); + } + + // Parse BEL events from child process + let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines); + for event in new_events { self.events.append_event(event)?; } + self.unhandled_lines.drain(..); + + // Return BEL events since provided idx + self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| { + if events.len() as u64 == EVENT_SIZE_LIMIT { + Err(format!("Returned {} events - that's way too many.", EVENT_SIZE_LIMIT).into()) + } else { + Ok(events) + } + } ) + } + + pub fn event_for_line(line: String) -> Option { + // TODO parse missing data dep event + // TODO parse job state + None + } + + pub fn process_lines(job_run_id: Uuid, lines: &Vec) -> Vec { + let mut events: Vec = Default::default(); + + if lines.len() > 0 { + // If any lines were written to stdout, we should heartbeat + events.push(Event::JobRunHeartbeatV1( + JobRunHeartbeatEventV1 { job_run_id: job_run_id.clone().into() } + )); + } + + for event in lines.iter().flat_map(|line| Self::event_for_line(line.clone())) { + events.push(event); + } + + events + } + + pub fn is_complete(&mut self) -> bool { + self.child.try_wait().expect("Failed to wait on child").is_some() + } +} + +mod tests { + use crate::job_run::JobRun; + + #[test] + fn test_process_lines_empty() { + let lines = Vec::::new(); + let events = JobRun::process_lines(Default::default(), &lines); + assert_eq!(events.len(), 0); + } + + #[test] + fn test_process_lines_heartbeat() { + let lines_1 = vec!("Hello, salem".to_string()); + let events_1 = JobRun::process_lines(Default::default(), &lines_1); + assert_eq!(events_1.len(), 1); + + let lines_2 = vec!("Hello, salem".to_string(), "Hello, pippin".to_string()); + let events_2 = JobRun::process_lines(Default::default(), &lines_2); + assert_eq!(events_2.len(), 1); + } +} + diff --git a/databuild/lib.rs b/databuild/lib.rs index a3fd2aa..b7a5000 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -1,4 +1,7 @@ mod build_event_log; +mod orchestrator; +mod job_run; +mod job; // Include generated protobuf code include!("databuild.rs"); diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs new file mode 100644 index 0000000..898fda6 --- /dev/null +++ b/databuild/orchestrator.rs @@ -0,0 +1,131 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Debug; +use crate::build_event_log::{BELStorage, BuildEventLog}; +use crate::job::JobConfiguration; +use crate::job_run::JobRun; +use crate::{PartitionRef, WantDetail}; + +/** +Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads + +the visitor pattern to monitor job exec progress and liveness, and adds +*/ + + +#[derive(Debug)] +struct Orchestrator { + bel: BuildEventLog, + job_runs: Vec, + config: OrchestratorConfig, +} + +#[derive(Debug)] +struct JobRunHandle { + job_run: JobRun, + bel_idx: u64, +} + +impl From for JobRunHandle { + fn from(job_run: JobRun) -> Self { Self { job_run, bel_idx: 0 } } +} + +#[derive(Debug, Clone)] +struct OrchestratorConfig { + jobs: Vec, +} + +impl OrchestratorConfig { + fn job_configuration_for_label(&self, label: &str) -> Option { + self.jobs.iter().find(|job| job.label == label).cloned() + } + + fn match_job_partition(&self, pref: &PartitionRef) -> Option { + self.jobs.iter().find(|job| job.matches(pref)).cloned() + } +} + +#[derive(Debug, Clone)] +struct WantGroup { + job: JobConfiguration, + wants: Vec, +} + +#[derive(Debug, Clone)] +struct GroupedWants { + want_groups: Vec, + unhandled_wants: Vec, +} + +impl Orchestrator { + fn new(storage: B, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage), job_runs: Vec::new(), config } } + + /** Continuously invoked function to watch job run status */ + fn poll_jobs(&mut self) -> Result<(), Box> { + // Visit existing jobs, remove completed + self.job_runs.retain_mut(|jr| { + // Append emitted events + let events = jr.job_run.visit(jr.bel_idx.clone()).expect("Job visit failed"); + events.iter().filter_map(|event| event.event.clone()).for_each(|event| { + self.bel.append_event(event.clone()).expect("Failed to append event"); + }); + + jr.job_run.is_complete() + }); + + Ok(()) + } + + /** Continuously invoked function to watch wants and schedule new jobs */ + fn poll_wants(&mut self) -> Result<(), Box> { + // Collect unhandled wants, group by job that handles each partition, + let grouped_wants = Orchestrator::::group_wants(&self.config, &self.bel.schedulable_wants()); + + if !grouped_wants.want_groups.is_empty() { + // All wants must be mapped to jobs that can be handled + // TODO we probably want to handle this gracefully in the near future + Err(format!("Unable to map following wants: {:?}", &grouped_wants.want_groups).into()) + } else { + for wg in grouped_wants.want_groups { + let job_run = wg.job.spawn(wg.wants)?; + self.job_runs.push(JobRunHandle::from(job_run)); + } + + Ok(()) + } + } + + fn group_wants(config: &OrchestratorConfig, wants: &Vec) -> GroupedWants { + let mut want_groups: HashMap> = Default::default(); + let mut unhandled_wants: Vec = Default::default(); + wants.iter().for_each(|want| { + want.refs.iter().for_each(|pref| { + let matched_job = config.match_job_partition(pref); + match matched_job { + None => unhandled_wants.push(want.clone()), + Some(jc) => want_groups.entry(jc.label).or_default().push(want.clone()), + } + }); + }); + GroupedWants { + want_groups: want_groups.iter().map(|(k, v)| { + WantGroup { + job: config.job_configuration_for_label(k).expect(&format!("Job configuration not found for label `{}`", k)), + wants: v.to_owned(), + } + }).collect(), + unhandled_wants, + } + } + + /** Entrypoint for running jobs */ + pub fn join(mut self) -> Result<(), Box> { + loop { + self.poll_jobs()?; + self.poll_wants()?; + } + } +} + +mod tests { + +} \ No newline at end of file