databuild/databuild/orchestrator.rs

677 lines
24 KiB
Rust

use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::job::JobConfiguration;
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
SubProcessBackend,
};
use crate::{PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
the visitor pattern to monitor job exec progress and liveness, and adds
*/
struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S>,
not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
dep_miss_jobs: Vec<DepMissJobRun<SubProcessBackend>>,
config: OrchestratorConfig,
}
impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self {
Self {
bel: Default::default(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: Default::default(),
}
}
}
impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self {
Self {
bel: self.bel.clone(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: self.config.clone(),
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn with_config(self, config: OrchestratorConfig) -> Self {
Self { config, ..self }
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self {
config: self.config.with_jobs(jobs),
..self
}
}
fn with_bel(self, bel: BuildEventLog<S>) -> Self {
Self { bel, ..self }
}
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
fn default() -> Self {
Self {
jobs: Vec::default(),
}
}
}
impl OrchestratorConfig {
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.label == label).cloned()
}
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.matches(pref)).cloned()
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self { jobs }
}
fn with_job(self, job: JobConfiguration) -> Self {
Self { jobs: vec![job] }
}
}
#[derive(Debug, Clone)]
struct WantGroup {
job: JobConfiguration,
wants: Vec<WantDetail>,
}
impl WantGroup {
pub fn spawn(&self) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
self.job.spawn(self.wants.clone())
}
}
#[derive(Debug, Clone)]
struct GroupedWants {
want_groups: Vec<WantGroup>,
unhandled_wants: Vec<WantDetail>,
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
not_started_jobs: Vec::new(),
running_jobs: Vec::new(),
completed_jobs: Vec::new(),
failed_jobs: Vec::new(),
dep_miss_jobs: Vec::new(),
config,
}
}
/** Continuously invoked function to watch job run status */
fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> {
use crate::job_run::JobRunVisitResult;
// First, start any not-started jobs
while let Some(job) = self.not_started_jobs.pop() {
let running = job.run()?;
self.running_jobs.push(running);
}
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
for mut job in self.running_jobs.drain(..) {
match job.visit()? {
JobRunVisitResult::StillRunning => {
still_running.push(job);
}
JobRunVisitResult::Completed(completed) => {
// Emit success event
let event: Event = completed.state.to_event(&completed.id());
self.bel.append_event(&event)?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
// Emit failure event
let event: Event = failed.state.to_event(&failed.id());
self.bel.append_event(&event)?;
self.failed_jobs.push(failed);
}
JobRunVisitResult::DepMiss(dep_miss) => {
for event in dep_miss_to_events(&self.bel.state, &dep_miss)? {
self.bel.append_event(&event)?;
}
// Record missing upstream status in want details
self.dep_miss_jobs.push(dep_miss);
}
}
}
self.running_jobs = still_running;
Ok(())
}
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition,
let schedulability = self.bel.state.schedulable_wants();
println!("schedulability: {:?}", schedulability);
let schedulable_wants = schedulability
.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
println!("grouped wants: {:?}", grouped_wants);
if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!(
"Unable to map following wants: {:?}",
&grouped_wants.unhandled_wants
)
.into())
} else {
for wg in grouped_wants.want_groups {
self.not_started_jobs.push(wg.spawn()?);
}
Ok(())
}
}
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
let mut unhandled_wants: Vec<WantDetail> = Default::default();
wants.iter().for_each(|want| {
want.partitions.iter().for_each(|pref| {
let matched_job = config.match_job_partition(pref);
match matched_job {
None => unhandled_wants.push(want.clone()),
Some(jc) => want_groups.entry(jc.label).or_default().push(want.clone()),
}
});
});
GroupedWants {
want_groups: want_groups
.iter()
.map(|(k, v)| WantGroup {
job: config
.job_configuration_for_label(k)
.expect(&format!("Job configuration not found for label `{}`", k)),
wants: v.to_owned(),
})
.collect(),
unhandled_wants,
}
}
fn step(&mut self) -> Result<(), Box<dyn Error>> {
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())
}
/** Entrypoint for running jobs */
pub fn join(&mut self) -> Result<(), Box<dyn Error>> {
loop {
self.step()?
}
}
}
fn dep_miss_to_events(
bel_state: &BuildState,
dep_miss: &DepMissJobRun<SubProcessBackend>,
) -> Result<Vec<Event>, Box<dyn Error>> {
let mut events = vec![];
// Append literal job run dep miss
events.push(dep_miss.state.to_event(&dep_miss.id()));
// Append wants from dep miss
let job_run_detail = bel_state
.get_job_run(&dep_miss.job_run_id.to_string())
.ok_or(format!(
"Unable to find job run with id `{}`",
dep_miss.job_run_id
))?;
// Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail
.servicing_wants
.iter()
.flat_map(|wap| bel_state.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
dep_miss.state.missing_deps.clone(),
&dep_miss.job_run_id,
want_timestamps,
);
events.extend(want_events);
Ok(events)
}
#[cfg(test)]
mod tests {
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use crate::WantCreateEventV1;
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
Orchestrator::default()
}
impl WantCreateEventV1 {
/// A naive random want for testing purposes
pub fn sample() -> Self {
Self {
want_id: Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
source: None,
comment: Some("test want".to_string()),
}
}
}
/// Scenario 1
/// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with
/// alpha depending on a single output from beta, and beta with no deps.
fn setup_scenario_a_to_b(
mut orchestrator: Orchestrator<MemoryBELStorage>,
) -> Orchestrator<MemoryBELStorage> {
// Define test jobs
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
pattern: "data/alpha".to_string(),
entry_point: MockJobRun::bin_path(),
},
JobConfiguration {
label: "beta".to_string(),
pattern: "data/beta".to_string(),
entry_point: MockJobRun::bin_path(),
},
],
};
orchestrator
}
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
#[test]
#[ignore] // TODO define this interface
fn test_spawned_process_job() {
todo!()
}
}
// The orchestrator relies on polling job run status to react to job completions that imply
// key outcomes like:
// - Success: partitions produced, other job runs may be schedulable
// - Dep miss: wants need to be created
// - Failure: engineer likely needs to react
mod poll_job_runs {
// Use case: we find a job that has completed, BEL should be written with appropriate event
// (both for success and fail cases)
#[test]
#[ignore]
fn test_job_completion_events() {
todo!()
}
//Use case: a job has written new stdout, it should produce a new heartbeat event in the BEL
// TODO - we should come back here later and ensure we have a minimum heartbeat period
#[test]
#[ignore]
fn test_heartbeat_from_stdout() {
todo!()
}
}
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::WantCreateEventV1;
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
fn test_empty_wants_noop() {
let mut orchestrator = build_orchestrator();
// Should init with no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
orchestrator
.poll_wants()
.expect("shouldn't fail to poll empty wants");
// Should still be empty since no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
// (but in this case using a noop/mock child process)
#[test]
fn test_schedulable_wants_should_schedule() {
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
assert_eq!(orchestrator.not_started_jobs.len(), 0);
// When
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should schedule alpha job
assert_eq!(orchestrator.not_started_jobs.len(), 1);
assert_eq!(
orchestrator
.not_started_jobs
.iter()
.take(1)
.last()
.unwrap()
.state
.args,
vec!["data/alpha"],
"should have scheduled alpha job"
)
}
// Use case: A schedulable want that can't be matched to a job should return an error
#[test]
fn test_schedulable_want_no_matching_job() {
// Given
let mut orchestrator = build_orchestrator();
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
// When
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should not have scheduled any jobs
assert_eq!(orchestrator.not_started_jobs.len(), 0);
}
}
/// Orchestrator want creation is the means of data dependency propagation, allowing the
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
mod want_create {
use crate::data_build_event::Event;
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
use crate::orchestrator::dep_miss_to_events;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{JobRunDetail, MissingDeps, WantAttributedPartitions, WantCreateEventV1};
use std::marker::PhantomData;
use uuid::Uuid;
/// Use case: The orchestrator should map a failed job into a set of wants
#[test]
fn test_job_fail_want_mapping() {
// Given a
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// Add event for originating want
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/beta".into()],
..WantCreateEventV1::sample()
}))
.expect("event append");
// Create failed job run detail
let job_run_id = Uuid::new_v4();
let job_run = JobRunDetail {
servicing_wants: orchestrator
.bel
.state
.wants
.values()
.map(|w| WantAttributedPartitions {
want_id: w.want_id.clone(),
partitions: w.partitions.clone(),
})
.collect(),
..JobRunDetail::default()
};
orchestrator
.bel
.state
.job_runs
.insert(job_run_id.into(), job_run);
// Add event for job failure
let dep_miss_job_run = DepMissJobRun {
job_run_id,
state: SubProcessDepMiss {
stdout_buffer: vec![],
missing_deps: vec![MissingDeps {
impacted: vec!["data/beta".into()],
missing: vec!["data/alpha".into()],
}],
read_deps: vec![],
},
_backend: PhantomData,
};
// When calculating events from dep miss
let events = dep_miss_to_events(&orchestrator.bel.state, &dep_miss_job_run).unwrap();
// Should have scheduled a job for alpha
assert_eq!(
events
.iter()
.filter(|e| match e {
Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
_ => false,
})
.count(),
1
);
assert!(
orchestrator.not_started_jobs.is_empty(),
"shouldn't have scheduled yet"
);
// Should schedule job after we poll wants
orchestrator.poll_wants().expect("poll wants");
assert_eq!(
orchestrator.not_started_jobs.len(),
1,
"should have scheduled job"
);
}
}
/// Orchestrator needs to be able to achieve high level orchestration use cases.
mod orchestration {
/// Use case: should run a job to produce a partition in reaction to a want, then have the
/// want fulfilled.
#[test]
#[ignore]
fn test_want_builds_partition() {
todo!()
}
// Use case: a graph with multi-hop deps should achieve the multi-hop build
// - Job B depends on part_a produced by job A
// - Job B should be attempted, fail, and create a want for part_a
// - Job A should be attempted, succeed, and produce part_a
// - Job B should be attempted, succeed, and produce part_b
#[test]
#[ignore]
fn test_multi_hop_want_builds_partition() {
todo!()
}
}
// The orchestrator groups wants to enable efficient execution. Many individual wants may
// reference the same partitions, or many different partitions may be referenced by many
// different wants. The orchestrator needs to be able to achieve job run batching, where a
// single job run builds multiple partitions from multiple different wants.
mod want_grouping {
use super::super::*;
use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail};
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration {
label: label.to_string(),
pattern: pattern.to_string(),
entry_point: "test_entrypoint".to_string(),
}
}
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
WantDetail {
want_id: want_id.to_string(),
partitions: partition_refs
.iter()
.map(|r| PartitionRef {
r#ref: r.to_string(),
})
.collect(),
upstreams: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
status: None,
last_updated_timestamp: 0,
}
}
#[test]
fn test_group_wants_empty_config_empty_wants() {
let config = OrchestratorConfig { jobs: vec![] };
let wants = vec![];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.want_groups.is_empty());
assert!(result.unhandled_wants.is_empty());
}
#[test]
fn test_group_wants_one_want_matches_job() {
let job_config = create_job_config("test_job", "partition.*");
let config = OrchestratorConfig {
jobs: vec![job_config.clone()],
};
let want = create_want_detail("want1", vec!["partition1"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 1);
assert_eq!(result.want_groups[0].job.label, "test_job");
assert_eq!(result.want_groups[0].wants.len(), 1);
assert_eq!(result.want_groups[0].wants[0].want_id, "want1");
}
#[test]
fn test_group_wants_one_unmatching_want() {
let job_config = create_job_config("test_job", "^test_pattern$");
let config = OrchestratorConfig {
jobs: vec![job_config],
};
let want = create_want_detail("want1", vec!["different_partition"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert_eq!(result.unhandled_wants.len(), 1);
assert_eq!(result.unhandled_wants[0].want_id, "want1");
assert!(result.want_groups.is_empty());
}
#[test]
fn test_group_wants_multiple_wants_different_jobs() {
let job_config1 = create_job_config("job1", "pattern1.*");
let job_config2 = create_job_config("job2", "pattern2.*");
let config = OrchestratorConfig {
jobs: vec![job_config1, job_config2],
};
let want1 = create_want_detail("want1", vec!["pattern1_partition"]);
let want2 = create_want_detail("want2", vec!["pattern1_other"]);
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
let wants = vec![want1, want2, want3];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 2);
// Find job1 group
let job1_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job1")
.unwrap();
assert_eq!(job1_group.wants.len(), 2);
// Find job2 group
let job2_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job2")
.unwrap();
assert_eq!(job2_group.wants.len(), 1);
}
}
}