Add orchestrator and job run bones
This commit is contained in:
parent
526b826091
commit
97ddb3ae28
7 changed files with 289 additions and 13 deletions
|
|
@ -21,7 +21,10 @@ rust_library(
|
||||||
name = "databuild",
|
name = "databuild",
|
||||||
srcs = [
|
srcs = [
|
||||||
"build_event_log.rs",
|
"build_event_log.rs",
|
||||||
|
"job.rs",
|
||||||
|
"job_run.rs",
|
||||||
"lib.rs",
|
"lib.rs",
|
||||||
|
"orchestrator.rs",
|
||||||
":generate_databuild_rust",
|
":generate_databuild_rust",
|
||||||
],
|
],
|
||||||
edition = "2021",
|
edition = "2021",
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,11 @@ use crate::{BuildState, DataBuildEvent, WantDetail};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use std::fmt::Debug;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
trait BELStorage {
|
pub trait BELStorage {
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
||||||
fn list_events(
|
fn list_events(
|
||||||
&self,
|
&self,
|
||||||
|
|
@ -15,12 +16,17 @@ trait BELStorage {
|
||||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MemoryBELStorage {
|
#[derive(Debug)]
|
||||||
|
pub struct MemoryBELStorage {
|
||||||
events: Vec<DataBuildEvent>,
|
events: Vec<DataBuildEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for MemoryBELStorage {
|
||||||
|
fn default() -> Self { Self::new() }
|
||||||
|
}
|
||||||
|
|
||||||
impl MemoryBELStorage {
|
impl MemoryBELStorage {
|
||||||
fn create() -> MemoryBELStorage {
|
pub fn new() -> MemoryBELStorage {
|
||||||
MemoryBELStorage { events: vec![] }
|
MemoryBELStorage { events: vec![] }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -52,6 +58,7 @@ impl BELStorage for MemoryBELStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct SqliteBELStorage {
|
struct SqliteBELStorage {
|
||||||
connection: Connection,
|
connection: Connection,
|
||||||
}
|
}
|
||||||
|
|
@ -142,20 +149,21 @@ impl BELStorage for SqliteBELStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BuildEventLog<B: BELStorage> {
|
#[derive(Debug)]
|
||||||
storage: B,
|
pub struct BuildEventLog<B: BELStorage + Debug> {
|
||||||
state: Arc<RwLock<BuildState>>,
|
pub storage: B,
|
||||||
|
pub state: Arc<RwLock<BuildState>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BELStorage> BuildEventLog<B> {
|
impl<B: BELStorage + Debug> BuildEventLog<B> {
|
||||||
fn create(storage: B) -> BuildEventLog<B> {
|
pub fn new(storage: B) -> BuildEventLog<B> {
|
||||||
BuildEventLog {
|
BuildEventLog {
|
||||||
storage,
|
storage,
|
||||||
state: Arc::new(Default::default()),
|
state: Arc::new(Default::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
pub fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||||
self.reduce(event.clone())?;
|
self.reduce(event.clone())?;
|
||||||
let idx = self.storage.append_event(event)?;
|
let idx = self.storage.append_event(event)?;
|
||||||
Ok(idx)
|
Ok(idx)
|
||||||
|
|
@ -174,7 +182,7 @@ impl<B: BELStorage> BuildEventLog<B> {
|
||||||
Event::WantCreateV1(e) => {
|
Event::WantCreateV1(e) => {
|
||||||
state
|
state
|
||||||
.wants
|
.wants
|
||||||
.insert(e.want_id.clone(), WantDetail { want_id: e.want_id });
|
.insert(e.want_id.clone(), WantDetail { want_id: e.want_id, refs: e.partitions });
|
||||||
}
|
}
|
||||||
Event::WantCancelV1(e) => {}
|
Event::WantCancelV1(e) => {}
|
||||||
Event::TaintCreateV1(e) => {}
|
Event::TaintCreateV1(e) => {}
|
||||||
|
|
@ -183,6 +191,10 @@ impl<B: BELStorage> BuildEventLog<B> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
@ -197,8 +209,8 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_append_event() {
|
fn test_append_event() {
|
||||||
let storage = MemoryBELStorage::create();
|
let storage = MemoryBELStorage::new();
|
||||||
let mut log = BuildEventLog::create(storage);
|
let mut log = BuildEventLog::new(storage);
|
||||||
// Initial state
|
// Initial state
|
||||||
assert_eq!(log.storage.events.len(), 0);
|
assert_eq!(log.storage.events.len(), 0);
|
||||||
let want_id = "1234".to_string();
|
let want_id = "1234".to_string();
|
||||||
|
|
@ -241,7 +253,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sqlite_append_event() {
|
fn test_sqlite_append_event() {
|
||||||
let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||||
let mut log = BuildEventLog::create(storage);
|
let mut log = BuildEventLog::new(storage);
|
||||||
|
|
||||||
let want_id = "sqlite_test_1234".to_string();
|
let want_id = "sqlite_test_1234".to_string();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -133,6 +133,7 @@ message BuildState {
|
||||||
|
|
||||||
message WantDetail {
|
message WantDetail {
|
||||||
string want_id = 1;
|
string want_id = 1;
|
||||||
|
repeated PartitionRef refs = 2;
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
24
databuild/job.rs
Normal file
24
databuild/job.rs
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
use std::error::Error;
|
||||||
|
use crate::job_run::JobRun;
|
||||||
|
use crate::{PartitionRef, WantDetail};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct JobConfiguration {
|
||||||
|
pub label: String,
|
||||||
|
pub pattern: String,
|
||||||
|
pub entrypoint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobConfiguration {
|
||||||
|
/** Launch job to build the partitions specified by the provided wants. */
|
||||||
|
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
|
||||||
|
let wanted_refs: Vec<PartitionRef> = wants.iter().flat_map(|want| want.refs.clone()).collect();
|
||||||
|
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||||
|
JobRun::spawn(self.entrypoint.clone(), args)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn matches(&self, refs: &PartitionRef) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
102
databuild/job_run.rs
Normal file
102
databuild/job_run.rs
Normal file
|
|
@ -0,0 +1,102 @@
|
||||||
|
use crate::build_event_log::{BELStorage, MemoryBELStorage};
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::{DataBuildEvent, JobRunHeartbeatEventV1};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::io::{BufRead, BufReader};
|
||||||
|
use std::process::{Child, Command, Stdio};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct JobRun {
|
||||||
|
job_run_id: Uuid,
|
||||||
|
events: MemoryBELStorage,
|
||||||
|
child: Child,
|
||||||
|
unhandled_lines: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
const EVENT_SIZE_LIMIT: u64 = 1000000;
|
||||||
|
|
||||||
|
impl JobRun {
|
||||||
|
pub fn spawn(command: String, args: Vec<String>) -> Result<JobRun, Box<dyn Error>> {
|
||||||
|
Ok(JobRun {
|
||||||
|
job_run_id: Default::default(),
|
||||||
|
events: Default::default(),
|
||||||
|
child: Command::new(command).args(args).stdout(Stdio::piped())
|
||||||
|
.spawn()?,
|
||||||
|
unhandled_lines: Default::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn visit(&mut self, since_idx: u64) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||||
|
// Collect new lines from child process
|
||||||
|
let stdout = self.child.stdout.take().expect("stdout not piped");
|
||||||
|
let reader = BufReader::new(stdout);
|
||||||
|
for line in reader.lines() {
|
||||||
|
self.unhandled_lines.push(line?);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse BEL events from child process
|
||||||
|
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
|
||||||
|
for event in new_events { self.events.append_event(event)?; }
|
||||||
|
self.unhandled_lines.drain(..);
|
||||||
|
|
||||||
|
// Return BEL events since provided idx
|
||||||
|
self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| {
|
||||||
|
if events.len() as u64 == EVENT_SIZE_LIMIT {
|
||||||
|
Err(format!("Returned {} events - that's way too many.", EVENT_SIZE_LIMIT).into())
|
||||||
|
} else {
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
|
} )
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn event_for_line(line: String) -> Option<Event> {
|
||||||
|
// TODO parse missing data dep event
|
||||||
|
// TODO parse job state
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_lines(job_run_id: Uuid, lines: &Vec<String>) -> Vec<Event> {
|
||||||
|
let mut events: Vec<Event> = Default::default();
|
||||||
|
|
||||||
|
if lines.len() > 0 {
|
||||||
|
// If any lines were written to stdout, we should heartbeat
|
||||||
|
events.push(Event::JobRunHeartbeatV1(
|
||||||
|
JobRunHeartbeatEventV1 { job_run_id: job_run_id.clone().into() }
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in lines.iter().flat_map(|line| Self::event_for_line(line.clone())) {
|
||||||
|
events.push(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
events
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_complete(&mut self) -> bool {
|
||||||
|
self.child.try_wait().expect("Failed to wait on child").is_some()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod tests {
|
||||||
|
use crate::job_run::JobRun;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_process_lines_empty() {
|
||||||
|
let lines = Vec::<String>::new();
|
||||||
|
let events = JobRun::process_lines(Default::default(), &lines);
|
||||||
|
assert_eq!(events.len(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_process_lines_heartbeat() {
|
||||||
|
let lines_1 = vec!("Hello, salem".to_string());
|
||||||
|
let events_1 = JobRun::process_lines(Default::default(), &lines_1);
|
||||||
|
assert_eq!(events_1.len(), 1);
|
||||||
|
|
||||||
|
let lines_2 = vec!("Hello, salem".to_string(), "Hello, pippin".to_string());
|
||||||
|
let events_2 = JobRun::process_lines(Default::default(), &lines_2);
|
||||||
|
assert_eq!(events_2.len(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
mod build_event_log;
|
mod build_event_log;
|
||||||
|
mod orchestrator;
|
||||||
|
mod job_run;
|
||||||
|
mod job;
|
||||||
|
|
||||||
// Include generated protobuf code
|
// Include generated protobuf code
|
||||||
include!("databuild.rs");
|
include!("databuild.rs");
|
||||||
|
|
|
||||||
131
databuild/orchestrator.rs
Normal file
131
databuild/orchestrator.rs
Normal file
|
|
@ -0,0 +1,131 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use crate::build_event_log::{BELStorage, BuildEventLog};
|
||||||
|
use crate::job::JobConfiguration;
|
||||||
|
use crate::job_run::JobRun;
|
||||||
|
use crate::{PartitionRef, WantDetail};
|
||||||
|
|
||||||
|
/**
|
||||||
|
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||||
|
the visitor pattern to monitor job exec progress and liveness, and adds
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Orchestrator<B: BELStorage + Debug> {
|
||||||
|
bel: BuildEventLog<B>,
|
||||||
|
job_runs: Vec<JobRunHandle>,
|
||||||
|
config: OrchestratorConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct JobRunHandle {
|
||||||
|
job_run: JobRun,
|
||||||
|
bel_idx: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<JobRun> for JobRunHandle {
|
||||||
|
fn from(job_run: JobRun) -> Self { Self { job_run, bel_idx: 0 } }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct OrchestratorConfig {
|
||||||
|
jobs: Vec<JobConfiguration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OrchestratorConfig {
|
||||||
|
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
|
||||||
|
self.jobs.iter().find(|job| job.label == label).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
|
||||||
|
self.jobs.iter().find(|job| job.matches(pref)).cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct WantGroup {
|
||||||
|
job: JobConfiguration,
|
||||||
|
wants: Vec<WantDetail>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct GroupedWants {
|
||||||
|
want_groups: Vec<WantGroup>,
|
||||||
|
unhandled_wants: Vec<WantDetail>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BELStorage + Debug> Orchestrator<B> {
|
||||||
|
fn new(storage: B, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage), job_runs: Vec::new(), config } }
|
||||||
|
|
||||||
|
/** Continuously invoked function to watch job run status */
|
||||||
|
fn poll_jobs(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
|
// Visit existing jobs, remove completed
|
||||||
|
self.job_runs.retain_mut(|jr| {
|
||||||
|
// Append emitted events
|
||||||
|
let events = jr.job_run.visit(jr.bel_idx.clone()).expect("Job visit failed");
|
||||||
|
events.iter().filter_map(|event| event.event.clone()).for_each(|event| {
|
||||||
|
self.bel.append_event(event.clone()).expect("Failed to append event");
|
||||||
|
});
|
||||||
|
|
||||||
|
jr.job_run.is_complete()
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Continuously invoked function to watch wants and schedule new jobs */
|
||||||
|
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
|
// Collect unhandled wants, group by job that handles each partition,
|
||||||
|
let grouped_wants = Orchestrator::<B>::group_wants(&self.config, &self.bel.schedulable_wants());
|
||||||
|
|
||||||
|
if !grouped_wants.want_groups.is_empty() {
|
||||||
|
// All wants must be mapped to jobs that can be handled
|
||||||
|
// TODO we probably want to handle this gracefully in the near future
|
||||||
|
Err(format!("Unable to map following wants: {:?}", &grouped_wants.want_groups).into())
|
||||||
|
} else {
|
||||||
|
for wg in grouped_wants.want_groups {
|
||||||
|
let job_run = wg.job.spawn(wg.wants)?;
|
||||||
|
self.job_runs.push(JobRunHandle::from(job_run));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
|
||||||
|
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
|
||||||
|
let mut unhandled_wants: Vec<WantDetail> = Default::default();
|
||||||
|
wants.iter().for_each(|want| {
|
||||||
|
want.refs.iter().for_each(|pref| {
|
||||||
|
let matched_job = config.match_job_partition(pref);
|
||||||
|
match matched_job {
|
||||||
|
None => unhandled_wants.push(want.clone()),
|
||||||
|
Some(jc) => want_groups.entry(jc.label).or_default().push(want.clone()),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
GroupedWants {
|
||||||
|
want_groups: want_groups.iter().map(|(k, v)| {
|
||||||
|
WantGroup {
|
||||||
|
job: config.job_configuration_for_label(k).expect(&format!("Job configuration not found for label `{}`", k)),
|
||||||
|
wants: v.to_owned(),
|
||||||
|
}
|
||||||
|
}).collect(),
|
||||||
|
unhandled_wants,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Entrypoint for running jobs */
|
||||||
|
pub fn join(mut self) -> Result<(), Box<dyn Error>> {
|
||||||
|
loop {
|
||||||
|
self.poll_jobs()?;
|
||||||
|
self.poll_wants()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue