databuild/databuild/orchestrator.rs

929 lines
34 KiB
Rust

use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::SubProcessBackend;
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::fmt::Debug;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
the visitor pattern to monitor job exec progress and liveness.
JTBDs:
- Orchestrator turns job run dep miss failures into derivative wants for the missed partitions
- Orchestrator turns schedulable wants into job runs to build the requested partitions
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
jobs when possible
*/
struct Orchestrator<S: BELStorage + Debug> {
pub bel: BuildEventLog<S>,
pub config: OrchestratorConfig,
pub job_runs: Vec<crate::job_run::JobRun<SubProcessBackend>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self {
Self {
bel: Default::default(),
config: Default::default(),
job_runs: Default::default(),
}
}
}
impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self {
Self {
bel: self.bel.clone(),
config: self.config.clone(),
job_runs: Default::default(),
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn with_config(self, config: OrchestratorConfig) -> Self {
Self { config, ..self }
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self {
config: self.config.with_jobs(jobs),
..self
}
}
fn with_bel(self, bel: BuildEventLog<S>) -> Self {
Self { bel, ..self }
}
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
fn default() -> Self {
Self {
jobs: Vec::default(),
}
}
}
impl OrchestratorConfig {
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.label == label).cloned()
}
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.matches(pref)).cloned()
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self { jobs }
}
fn with_job(self, job: JobConfiguration) -> Self {
Self { jobs: vec![job] }
}
}
#[derive(Debug, Clone)]
struct WantGroup {
job: JobConfiguration,
wants: Vec<WantDetail>,
}
#[derive(Debug, Clone)]
struct GroupedWants {
want_groups: Vec<WantGroup>,
unhandled_wants: Vec<WantDetail>,
}
impl GroupedWants {
pub fn validate(&self) -> Result<(), DatabuildError> {
if !self.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!("Unable to map following wants: {:?}", self.unhandled_wants).into())
} else {
Ok(())
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
}
}
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::JobRun;
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?),
other => other, // Pass through all other states unchanged
};
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::{JobRun, VisitResult};
self.schedule_queued_jobs()?;
// Visit all running jobs using type-safe transitions
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::Running(running) => {
match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
JobRun::Running(still_running)
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
JobRun::DepMiss(dep_miss)
}
}
}
other => other, // Pass through all non-running states unchanged
};
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
Ok(())
}
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
// Collect unhandled wants, group by job that handles each partition,
let schedulable_wants = self.bel.state.wants_schedulability().schedulable_wants();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
grouped_wants.validate()?;
// Spawn jobs and add events
for wg in grouped_wants.want_groups {
self.queue_job(wg)?;
}
Ok(())
}
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
let mut unhandled_wants: Vec<WantDetail> = Default::default();
wants.iter().for_each(|want| {
want.partitions.iter().for_each(|pref| {
let matched_job = config.match_job_partition(pref);
match matched_job {
None => unhandled_wants.push(want.clone()),
Some(jc) => want_groups.entry(jc.label).or_default().push(want.clone()),
}
});
});
GroupedWants {
want_groups: want_groups
.iter()
.map(|(k, v)| WantGroup {
job: config
.job_configuration_for_label(k)
.expect(&format!("Job configuration not found for label `{}`", k)),
wants: v.to_owned(),
})
.collect(),
unhandled_wants,
}
}
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
use crate::job_run::JobRun;
// Compute args from wants the same way JobConfiguration::spawn() does
let wanted_refs: Vec<crate::PartitionRef> =
wg.wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRun::spawn(wg.job.entry_point.clone(), args);
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id().to_string(),
job_label: wg.job.label,
building_partitions: wg
.wants
.iter()
.map(|w| w.partitions.clone())
.flatten()
.collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.job_runs.push(job_run);
Ok(())
}
fn step(&mut self) -> Result<(), DatabuildError> {
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())
}
// Helper methods for tests to count jobs by state
#[cfg(test)]
fn count_running_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Running(_))).count()
}
#[cfg(test)]
fn count_terminal_jobs(&self) -> usize {
self.job_runs.iter().filter(|j| j.is_terminal()).count()
}
#[cfg(test)]
fn count_not_started_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::NotStarted(_))).count()
}
#[cfg(test)]
fn count_dep_miss_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::DepMiss(_))).count()
}
#[cfg(test)]
fn count_completed_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Completed(_))).count()
}
/** Entrypoint for running jobs */
pub fn join(&mut self) -> Result<(), DatabuildError> {
loop {
self.step()?
}
}
}
#[cfg(test)]
mod tests {
use crate::WantCreateEventV1;
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
Orchestrator::default()
}
impl WantCreateEventV1 {
/// A naive random want for testing purposes
pub fn sample() -> Self {
Self {
want_id: Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
source: None,
comment: Some("test want".to_string()),
}
}
}
/// Scenario 1
/// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with
/// alpha depending on a single output from beta, and beta with no deps.
fn setup_scenario_a_to_b(
mut orchestrator: Orchestrator<MemoryBELStorage>,
) -> Orchestrator<MemoryBELStorage> {
// Define test jobs
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: MockJobRun::bin_path(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: MockJobRun::bin_path(),
},
],
};
orchestrator
}
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
#[test]
#[ignore] // TODO define this interface
fn test_spawned_process_job() {
todo!()
}
}
// The orchestrator relies on polling job run status to react to job completions that imply
// key outcomes like:
// - Success: partitions produced, other job runs may be schedulable
// - Dep miss: wants need to be created
// - Failure: engineer likely needs to react
mod poll_job_runs {
// Use case: we find a job that has completed, BEL should be written with appropriate event
// (both for success and fail cases)
#[test]
#[ignore]
fn test_job_completion_events() {
todo!()
}
//Use case: a job has written new stdout, it should produce a new heartbeat event in the BEL
// TODO - we should come back here later and ensure we have a minimum heartbeat period
#[test]
#[ignore]
fn test_heartbeat_from_stdout() {
todo!()
}
}
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::WantCreateEventV1;
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
fn test_empty_wants_noop() {
let mut orchestrator = build_orchestrator();
// Should init with no work to do
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_running_jobs(), 0);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll empty wants");
// Should still be empty since no work to do
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_running_jobs(), 0);
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
// (but in this case using a noop/mock child process)
#[test]
fn test_schedulable_wants_should_schedule() {
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
// When
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should schedule alpha job
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// Verify the job has the right args by checking the first NotStarted job
use crate::job_run::JobRun;
let not_started_job = orchestrator.job_runs.iter().find(|j| matches!(j, JobRun::NotStarted(_))).unwrap();
if let JobRun::NotStarted(job) = not_started_job {
assert_eq!(job.state.args, vec!["data/alpha"], "should have scheduled alpha job");
}
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
}
// Use case: A schedulable want that can't be matched to a job should return an error
#[test]
fn test_schedulable_want_no_matching_job() {
// Given
let mut orchestrator = build_orchestrator();
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
// When
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should not have scheduled any jobs
assert_eq!(orchestrator.count_not_started_jobs(), 0);
}
}
/// Orchestrator want creation is the means of data dependency propagation, allowing the
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
mod want_create {
// /// Use case: The orchestrator should map a failed job into a set of wants
// #[test]
// fn test_job_fail_want_mapping() {
// // Given a
// let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// // Add event for originating want
// let want_create = WantCreateEventV1::sample();
// let building_partitions = vec!["data/beta".into()];
// orchestrator
// .bel
// .append_event(&Event::WantCreateV1(WantCreateEventV1 {
// partitions: building_partitions.clone(),
// ..want_create.clone()
// }))
// .expect("event append");
// // Create failed job run detail
// let want_attributed_partitions: Vec<WantAttributedPartitions> =
// vec![want_create.clone().into()];
// let job_run_id = Uuid::new_v4();
// let job_run = JobRunBufferEventV1 {
// job_run_id: job_run_id.into(),
// building_partitions: building_partitions.clone(),
// want_attributed_partitions: want_attributed_partitions.clone(),
// ..JobRunBufferEventV1::default()
// };
// orchestrator
// .bel
// .append_event(&Event::JobRunBufferV1(job_run))
// .expect("event append");
//
// // Job runs should not be empty
// orchestrator
// .bel
// .state
// .get_job_run(&job_run_id.to_string())
// .expect("job run should exist");
//
// // Add event for job failure
// let dep_miss_job_run = DepMissJobRun {
// job_run_id,
// state: SubProcessDepMiss {
// stdout_buffer: vec![],
// missing_deps: vec![MissingDeps {
// impacted: vec!["data/beta".into()],
// missing: vec!["data/alpha".into()],
// }],
// read_deps: vec![],
// },
// _backend: PhantomData,
// };
//
// // When calculating events from dep miss
// // TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
// let dep_miss_event = dep_miss_job_run.state.to_event(&dep_miss_job_run.id());;
// let events = orchestrator
// .bel
// .state
// .handle_job_run_dep_miss(&dep_miss_event)
// .unwrap();
//
// // Should have scheduled a job for alpha
// assert_eq!(
// events
// .iter()
// .filter(|e| match e {
// Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
// _ => false,
// })
// .count(),
// 1
// );
// assert!(
// orchestrator.not_started_jobs.is_empty(),
// "shouldn't have scheduled yet"
// );
//
// // Should schedule job after we poll wants
// orchestrator.poll_wants().expect("poll wants");
// assert_eq!(
// orchestrator.not_started_jobs.len(),
// 1,
// "should have scheduled job"
// );
// }
}
/// Orchestrator needs to be able to achieve high level orchestration use cases.
mod orchestration {
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{PartitionStatusCode, WantCreateEventV1};
use std::thread;
use std::time::Duration;
/// Use case: should run a job to produce a partition in reaction to a want, then have the
/// want fulfilled.
#[test]
fn test_want_builds_partition() {
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// Add event for originating want
let partition = "data/alpha";
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: vec![partition.into()],
..WantCreateEventV1::sample()
}))
.expect("event append");
// When
// Poll wants then schedule pending jobs
orchestrator
.poll_wants()
.expect("stage unscheduled jobs based on wants failed");
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// step should start job run
orchestrator.step().expect("should start run");
assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator
.step()
.expect("should still be running");
assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
// Wait for it to complete
thread::sleep(Duration::from_millis(10));
orchestrator
.step()
.expect("should be able to poll existing job run");
// Job run should have succeeded
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_completed_jobs(), 1);
// Build state should show partition as live
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"partition should be live after job run completion"
);
}
/// Helper to wait for running jobs to complete with timeout
fn wait_for_jobs_to_complete<S: crate::build_event_log::BELStorage + std::fmt::Debug>(
orchestrator: &mut crate::orchestrator::Orchestrator<S>,
max_steps: usize,
) -> Result<(), String> {
use std::thread;
use std::time::Duration;
for _i in 0..max_steps {
thread::sleep(Duration::from_millis(50));
if orchestrator.count_running_jobs() == 0 {
return Ok(());
}
orchestrator
.step()
.map_err(|e| format!("step failed: {}", e))?;
}
Err(format!("Jobs did not complete after {} steps", max_steps))
}
// Use case: a graph with multi-hop deps should achieve the multi-hop build
// - Job B depends on part_a produced by job A
// - Job B should be attempted, fail, and create a want for part_a
// - Job A should be attempted, succeed, and produce part_a
// - Job B should be attempted, succeed, and produce part_b
#[test]
fn test_multi_hop_want_builds_partition() {
use crate::job::JobConfiguration;
use crate::orchestrator::OrchestratorConfig;
use std::fs;
use std::os::unix::fs::PermissionsExt;
// Clean up marker file from any previous runs
let marker_file = "/tmp/databuild_test_alpha_complete";
let _ = fs::remove_file(marker_file);
// Create inline test scripts in /tmp
let alpha_script = "/tmp/test_job_alpha.sh";
let beta_script = "/tmp/test_job_beta.sh";
// Alpha job: creates marker file and outputs success
fs::write(
alpha_script,
r#"#!/bin/bash
touch /tmp/databuild_test_alpha_complete
echo '{"DataDepLogLine":{"Success":{"version":"1","produced_partitions":["data/alpha"]}}}'
"#,
)
.unwrap();
// Beta job: checks for alpha marker, outputs dep miss if not found
fs::write(beta_script, r#"#!/bin/bash
if [ ! -f /tmp/databuild_test_alpha_complete ]; then
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
exit 1
fi
echo 'Beta succeeded'
"#).unwrap();
// Make scripts executable
fs::set_permissions(alpha_script, fs::Permissions::from_mode(0o755)).unwrap();
fs::set_permissions(beta_script, fs::Permissions::from_mode(0o755)).unwrap();
// Given: Set up orchestrator with alpha and beta jobs using test scripts
let mut orchestrator = build_orchestrator();
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: alpha_script.to_string(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: beta_script.to_string(),
},
],
};
let partition_beta = "data/beta";
let partition_alpha = "data/alpha";
// Create initial want for beta partition
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: vec![partition_beta.into()],
..WantCreateEventV1::sample()
}))
.expect("event append");
// When: Run orchestrator steps to let it naturally handle the multi-hop build
// Step 1: Should schedule beta job (want -> not_started_jobs)
orchestrator.step().expect("step 1");
assert_eq!(
orchestrator.count_not_started_jobs(),
1,
"beta job should be queued"
);
// Step 2: Should start beta job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 2");
assert_eq!(
orchestrator.count_running_jobs(),
1,
"beta job should be running"
);
// Step 3: Beta job detects missing alpha dep and creates want
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// (Beta should now be in dep_miss state, and a want for alpha should be created)
assert_eq!(
orchestrator.count_dep_miss_jobs(),
1,
"beta should have dep miss"
);
// Step 4: Should schedule and start alpha job
// (dep miss handler created the alpha want, which will be picked up by poll_wants)
orchestrator.step().expect("step 4");
assert_eq!(
orchestrator.count_running_jobs(),
1,
"alpha job should be running"
);
// Step 6: Alpha completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete");
assert_eq!(
orchestrator.count_completed_jobs(),
1,
"alpha should complete"
);
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
// Step 7: Beta is rescheduled and started (want -> running_jobs)
orchestrator.step().expect("step 7");
assert_eq!(orchestrator.count_running_jobs(), 1, "beta should be running");
// Step 8: Beta completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// Then: Verify both partitions are live and both jobs completed
assert_eq!(
orchestrator.count_completed_jobs(),
2,
"both jobs should complete"
);
assert_eq!(
orchestrator.count_dep_miss_jobs(),
1,
"should have one dep miss"
);
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_beta)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"beta partition should be live after multi-hop build"
);
// Cleanup
let _ = fs::remove_file(marker_file);
let _ = fs::remove_file(alpha_script);
let _ = fs::remove_file(beta_script);
}
}
// The orchestrator groups wants to enable efficient execution. Many individual wants may
// reference the same partitions, or many different partitions may be referenced by many
// different wants. The orchestrator needs to be able to achieve job run batching, where a
// single job run builds multiple partitions from multiple different wants.
mod want_grouping {
use super::super::*;
use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail};
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration {
label: label.to_string(),
patterns: vec![pattern.to_string()],
entry_point: "test_entrypoint".to_string(),
}
}
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
WantDetail {
want_id: want_id.to_string(),
partitions: partition_refs
.iter()
.map(|r| PartitionRef {
r#ref: r.to_string(),
})
.collect(),
upstreams: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
status: None,
last_updated_timestamp: 0,
}
}
#[test]
fn test_group_wants_empty_config_empty_wants() {
let config = OrchestratorConfig { jobs: vec![] };
let wants = vec![];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.want_groups.is_empty());
assert!(result.unhandled_wants.is_empty());
}
#[test]
fn test_group_wants_one_want_matches_job() {
let job_config = create_job_config("test_job", "partition.*");
let config = OrchestratorConfig {
jobs: vec![job_config.clone()],
};
let want = create_want_detail("want1", vec!["partition1"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 1);
assert_eq!(result.want_groups[0].job.label, "test_job");
assert_eq!(result.want_groups[0].wants.len(), 1);
assert_eq!(result.want_groups[0].wants[0].want_id, "want1");
}
#[test]
fn test_group_wants_one_unmatching_want() {
let job_config = create_job_config("test_job", "^test_pattern$");
let config = OrchestratorConfig {
jobs: vec![job_config],
};
let want = create_want_detail("want1", vec!["different_partition"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert_eq!(result.unhandled_wants.len(), 1);
assert_eq!(result.unhandled_wants[0].want_id, "want1");
assert!(result.want_groups.is_empty());
}
#[test]
fn test_group_wants_multiple_wants_different_jobs() {
let job_config1 = create_job_config("job1", "pattern1.*");
let job_config2 = create_job_config("job2", "pattern2.*");
let config = OrchestratorConfig {
jobs: vec![job_config1, job_config2],
};
let want1 = create_want_detail("want1", vec!["pattern1_partition"]);
let want2 = create_want_detail("want2", vec!["pattern1_other"]);
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
let wants = vec![want1, want2, want3];
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 2);
// Find job1 group
let job1_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job1")
.unwrap();
assert_eq!(job1_group.wants.len(), 2);
// Find job2 group
let job2_group = result
.want_groups
.iter()
.find(|wg| wg.job.label == "job2")
.unwrap();
assert_eq!(job2_group.wants.len(), 1);
}
}
}