add tests for want propagation, update dep read/miss reporting

This commit is contained in:
Stuart Axelbrooke 2025-10-20 07:51:15 -07:00
parent 1bca863be1
commit bbeceaa015
8 changed files with 435 additions and 85 deletions

View file

@ -138,7 +138,7 @@ impl BuildState {
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
let live_details: Vec<&PartitionDetail> = want
.partitions
.upstreams
.iter()
.map(|pref| self.partitions.get(&pref.r#ref))
.flatten()
@ -148,7 +148,7 @@ impl BuildState {
.map(|pd| pd.r#ref.clone().expect("pref must have ref"))
.collect();
let missing: Vec<PartitionRef> = want
.partitions
.upstreams
.iter()
.filter(|pref| self.partitions.get(&pref.r#ref).is_none())
.cloned()
@ -188,6 +188,7 @@ impl BuildState {
}
}
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<PartitionRef>,
@ -201,6 +202,7 @@ pub struct WantSchedulability {
pub status: WantUpstreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl Into<bool> for WantsSchedulability {
@ -211,7 +213,7 @@ impl Into<bool> for WantsSchedulability {
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
self.status.missing.len() == 0 && self.status.tainted.len() == 0
self.status.missing.is_empty() && self.status.tainted.is_empty()
}
}
@ -244,6 +246,9 @@ mod tests {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
Self { upstreams, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
@ -293,7 +298,7 @@ mod tests {
let state = BuildState::default().with_wants(BTreeMap::from([(
test_partition.to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_upstreams(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));

View file

@ -1,21 +1,85 @@
use uuid::Uuid;
use crate::{event_source, EventSource, JobRunMissingDeps, JobTriggeredEvent, MissingDeps, WantAttributedPartitions, WantCreateEventV1, WantDetail};
use crate::data_build_event::Event;
use crate::event_source::Source;
use crate::{
JobRunMissingDeps, JobRunReadDeps, JobTriggeredEvent, MissingDeps, ReadDeps, WantCreateEventV1,
WantDetail,
};
use uuid::Uuid;
// TODO - how do we version this?
pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub const DATABUILD_MISSING_DEPS_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub const DATABUILD_DEP_READ_JSON: &str = "DATABUILD_DEP_READ_JSON:";
pub fn parse_log_line(line: &str) -> Option<JobRunMissingDeps> {
line_matches(line).and_then(json_to_missing_deps)
pub enum DataDepLogLine {
DepMiss(JobRunMissingDeps),
DepRead(JobRunReadDeps),
}
fn line_matches(line: &str) -> Option<&str> {
line.trim().strip_prefix(DATABUILD_JSON)
impl From<DataDepLogLine> for String {
fn from(value: DataDepLogLine) -> Self {
match value {
DataDepLogLine::DepMiss(dm) => {
format!(
"{}{}",
DATABUILD_MISSING_DEPS_JSON,
serde_json::to_string(&dm).expect("json serialize")
)
}
DataDepLogLine::DepRead(dr) => {
format!(
"{}{}",
DATABUILD_DEP_READ_JSON,
serde_json::to_string(&dr).expect("json serialize")
)
}
}
}
}
fn json_to_missing_deps(line: &str) -> Option<JobRunMissingDeps> {
serde_json::from_str(line).ok()
#[derive(Default, Debug)]
pub struct JobRunDataDepResults {
pub reads: Vec<ReadDeps>,
pub misses: Vec<MissingDeps>,
}
impl JobRunDataDepResults {
pub fn with(mut self, dep_log_line: DataDepLogLine) -> Self {
match dep_log_line {
DataDepLogLine::DepMiss(dm) => self.misses.extend(dm.missing_deps),
DataDepLogLine::DepRead(rd) => self.reads.extend(rd.read_deps),
}
self
}
pub fn with_lines(mut self, lines: Vec<String>) -> Self {
lines
.iter()
.flat_map(|line| parse_log_line(line))
.fold(self, |agg, it| agg.with(it))
}
}
impl Into<JobRunDataDepResults> for Vec<String> {
fn into(self) -> JobRunDataDepResults {
JobRunDataDepResults::default().with_lines(self)
}
}
pub fn parse_log_line(line: &str) -> Option<DataDepLogLine> {
if let Some(message) = line_matches(line, DATABUILD_MISSING_DEPS_JSON) {
serde_json::from_str(message)
.ok()
.map(|dm| DataDepLogLine::DepMiss(dm))
} else if let Some(message) = line_matches(line, DATABUILD_DEP_READ_JSON) {
serde_json::from_str(message)
.ok()
.map(|dm| DataDepLogLine::DepRead(dm))
} else {
None
}
}
fn line_matches<'a>(line: &'a str, prefix: &'a str) -> Option<&'a str> {
line.trim().strip_prefix(prefix)
}
pub struct WantTimestamps {
@ -50,19 +114,25 @@ pub fn missing_deps_to_want_events(
job_run_id: &Uuid,
want_timestamps: WantTimestamps,
) -> Vec<Event> {
missing_deps.iter().map(|md| {
missing_deps
.iter()
.map(|md| {
Event::WantCreateV1(WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: md.missing.clone(),
data_timestamp: want_timestamps.data_timestamp,
ttl_seconds: want_timestamps.ttl_seconds,
sla_seconds: want_timestamps.sla_seconds,
source: Some(JobTriggeredEvent {
source: Some(
JobTriggeredEvent {
job_run_id: job_run_id.to_string(),
}.into()),
}
.into(),
),
comment: Some("Missing data".to_string()),
})
}).collect()
})
.collect()
}
#[cfg(test)]
@ -76,7 +146,10 @@ mod tests {
let result = parse_log_line(&log_line);
assert!(result.is_some());
let missing_deps = result.unwrap();
let missing_deps = match result.unwrap() {
DataDepLogLine::DepMiss(md) => md,
_ => panic!("expected dep miss log line"),
};
assert_eq!(missing_deps.missing_deps.len(), 2);
// First entry: 1:1 (one missing input -> one impacted output)
@ -92,4 +165,79 @@ mod tests {
assert_eq!(missing_deps.missing_deps[1].missing.len(), 1);
assert_eq!(missing_deps.missing_deps[1].missing[0].r#ref, "input/p2");
}
/// We can accumulate dep miss and read events
#[test]
fn test_accumulate_dep_parse_and_miss() {
// Given
let r = JobRunDataDepResults::default();
assert_eq!(r.misses.len(), 0);
assert_eq!(r.reads.len(), 0);
// When
let r = r
.with(DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p1".into()],
read: vec!["input/p1".into()],
}],
}))
.with(DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p2".into()],
read: vec!["input/p2".into(), "input/p2".into()],
}],
}))
.with(DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["output/p3".into()],
missing: vec!["input/p3".into()],
}],
}));
}
/// It's acceptable to print separately for each missing dep
#[test]
fn test_parse_multiple_missing_deps() {
// Given
let r = JobRunDataDepResults::default();
let stdout_lines: Vec<String> = vec![
"something".into(),
DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p1".into()],
read: vec!["input/p1".into()],
}],
})
.into(),
DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p2".into()],
read: vec!["input/p2".into()],
}],
})
.into(),
"something else".into(),
DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["output/p3".into()],
missing: vec!["input/p3".into()],
}],
})
.into(),
];
// When
let results = r.with_lines(stdout_lines);
// Should
assert_eq!(results.misses.len(), 1);
assert_eq!(results.reads.len(), 2);
}
}

View file

@ -85,6 +85,11 @@ message JobRunCancelEventV1 {
message JobRunMissingDepsEventV1 {
string job_run_id = 1;
repeated MissingDeps missing_deps = 2;
repeated ReadDeps read_deps = 3;
}
message JobRunReadDepsEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2;
}
message JobRunMissingDeps {
string version = 1;
@ -95,6 +100,15 @@ message MissingDeps {
repeated PartitionRef impacted = 1;
repeated PartitionRef missing = 2;
}
message JobRunReadDeps {
string version = 1;
repeated ReadDeps read_deps = 2;
}
message ReadDeps {
// The list of partition refs that are built using the read deps (can be just 1)
repeated PartitionRef impacted = 1;
repeated PartitionRef read = 2;
}
message WantCreateEventV1 {
@ -152,14 +166,17 @@ enum WantStatusCode {
message WantDetail {
string want_id = 1;
// The partitions directly wanted by this want
repeated PartitionRef partitions = 2;
uint64 data_timestamp = 3;
uint64 ttl_seconds = 4;
uint64 sla_seconds = 5;
EventSource source = 6;
optional string comment = 7;
WantStatus status = 8;
uint64 last_updated_timestamp = 9;
// The upstream partitions, detected from a dep miss job run failure
repeated PartitionRef upstreams = 3;
uint64 data_timestamp = 4;
uint64 ttl_seconds = 5;
uint64 sla_seconds = 6;
EventSource source = 7;
optional string comment = 8;
WantStatus status = 9;
uint64 last_updated_timestamp = 10;
// TODO
}

View file

@ -1,6 +1,10 @@
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::{
event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent,
ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1,
WantCreateEventV1, WantDetail, WantStatus, WantStatusCode,
};
impl From<&WantCreateEventV1> for WantDetail {
fn from(e: &WantCreateEventV1) -> Self {
@ -12,12 +16,13 @@ impl From<WantCreateEventV1> for WantDetail {
WantDetail {
want_id: e.want_id,
partitions: e.partitions,
upstreams: vec![],
data_timestamp: e.data_timestamp,
ttl_seconds: e.ttl_seconds,
sla_seconds: e.sla_seconds,
source: e.source,
comment: e.comment,
status: Default::default(),
status: Some(Default::default()),
last_updated_timestamp: current_timestamp(),
}
}
@ -33,7 +38,6 @@ impl From<WantCancelEventV1> for Event {
}
}
impl From<WantStatusCode> for WantStatus {
fn from(code: WantStatusCode) -> Self {
WantStatus {
@ -71,12 +75,16 @@ impl From<JobRunStatusCode> for JobRunStatus {
impl From<ManuallyTriggeredEvent> for EventSource {
fn from(value: ManuallyTriggeredEvent) -> Self {
Self { source: Some(event_source::Source::ManuallyTriggered(value)) }
Self {
source: Some(event_source::Source::ManuallyTriggered(value)),
}
}
}
impl From<JobTriggeredEvent> for EventSource {
fn from(value: JobTriggeredEvent) -> Self {
Self { source: Some(event_source::Source::JobTriggered(value)) }
Self {
source: Some(event_source::Source::JobTriggered(value)),
}
}
}

View file

@ -1,6 +1,9 @@
use crate::data_build_event::Event;
use crate::data_deps::parse_log_line;
use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, JobRunSuccessEventV1, MissingDeps};
use crate::data_deps::{parse_log_line, JobRunDataDepResults};
use crate::{
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
JobRunSuccessEventV1, MissingDeps, ReadDeps,
};
use std::collections::HashMap;
use std::error::Error;
use std::io::{BufRead, BufReader};
@ -183,6 +186,7 @@ pub struct SubProcessRunning {
pub struct SubProcessCompleted {
pub exit_code: i32,
pub stdout_buffer: Vec<String>,
pub read_deps: Vec<ReadDeps>,
}
/// Failed state for SubProcess backend
@ -201,6 +205,7 @@ pub struct SubProcessCanceled {
pub struct SubProcessDepMiss {
pub stdout_buffer: Vec<String>,
pub missing_deps: Vec<MissingDeps>,
pub read_deps: Vec<ReadDeps>,
}
impl JobRunBackend for SubProcessBackend {
@ -240,6 +245,7 @@ impl JobRunBackend for SubProcessBackend {
> {
// Non-blocking check for exit status
if let Some(exit_status) = running.process.try_wait()? {
// Job has exited
// Read any remaining stdout
if let Some(stdout) = running.process.stdout.take() {
let reader = BufReader::new(stdout);
@ -251,8 +257,9 @@ impl JobRunBackend for SubProcessBackend {
}
}
// Take ownership of stdout_buffer
// Take ownership of stdout_buffer, parse dep events
let stdout_buffer = std::mem::take(&mut running.stdout_buffer);
let deps: JobRunDataDepResults = stdout_buffer.clone().into();
// Check exit status and return appropriate result
match exit_status.code() {
@ -261,18 +268,13 @@ impl JobRunBackend for SubProcessBackend {
Ok(PollResult::Completed(SubProcessCompleted {
exit_code: 0,
stdout_buffer,
read_deps: deps.reads,
}))
}
Some(code) => {
let missing_deps = stdout_buffer.iter().flat_map(|s| parse_log_line(&s));
// Failed with exit code
match missing_deps.last() {
Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss {
stdout_buffer,
missing_deps: misses.missing_deps,
})),
None => {
match deps.misses {
vec if vec.is_empty() => {
// No missing deps, job failed
let reason = format!("Job failed with exit code {}", code);
Ok(PollResult::Failed(SubProcessFailed {
@ -281,6 +283,11 @@ impl JobRunBackend for SubProcessBackend {
stdout_buffer,
}))
}
misses => Ok(PollResult::DepMiss(SubProcessDepMiss {
stdout_buffer,
missing_deps: misses,
read_deps: deps.reads,
})),
}
}
None => {
@ -350,6 +357,7 @@ impl SubProcessDepMiss {
Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
job_run_id: job_run_id.to_string(),
missing_deps: self.missing_deps.clone(),
read_deps: self.read_deps.clone(),
})
}
}
@ -362,22 +370,17 @@ pub struct JobRunPollResult {
mod tests {
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_JSON;
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend};
use crate::mock_job_run::MockJobRun;
use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps};
use std::collections::HashMap;
fn test_helper_path() -> String {
std::env::var("TEST_SRCDIR")
.map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir))
.unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string())
}
/// Happy path - run that succeeds should emit a JobRunSuccessEventV1
#[test]
fn test_job_run_success_returns_job_run_success_event() {
// Spawn a job run that will succeed (exit code 0)
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
// Start the job - this consumes the NotStarted and returns Running
let mut running_job = job_run.run().unwrap();
@ -410,11 +413,10 @@ mod tests {
#[test]
fn test_job_run_failure_returns_job_run_failure_event() {
// Spawn a job run
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
// Start the job with an exit code that indicates failure (non-zero)
let env: HashMap<String, String> =
HashMap::from([("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())]);
let env = MockJobRun::new().exit_code(1).to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion
@ -454,16 +456,13 @@ mod tests {
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()),
("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()),
(
"DATABUILD_TEST_FILE_CONTENT".to_string(),
"completed".to_string(),
),
]);
let env = MockJobRun::new()
.sleep_ms(1000)
.output_file(&temp_file, &"completed".to_string())
.exit_code(0)
.to_env();
let running_job = job_run.run_with_env(Some(env)).unwrap();
// Give it a tiny bit of time to start
@ -501,7 +500,7 @@ mod tests {
#[test]
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let expected_dep_miss = JobRunMissingDeps {
version: "1".into(),
@ -512,11 +511,11 @@ mod tests {
};
let dep_miss_json =
serde_json::to_string(&expected_dep_miss).expect("Failed to serialize dep miss");
let dep_miss_line = format!("{}{}", DATABUILD_JSON, dep_miss_json);
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_STDOUT".to_string(), dep_miss_line),
("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()),
]);
let dep_miss_line = format!("{}{}", DATABUILD_MISSING_DEPS_JSON, dep_miss_json);
let env = MockJobRun::new()
.stdout_msg(&dep_miss_line)
.exit_code(1)
.to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion

View file

@ -7,6 +7,7 @@ mod build_state;
mod event_transforms;
mod event_defaults;
mod data_deps;
mod mock_job_run;
// Include generated protobuf code
include!("databuild.rs");

83
databuild/mock_job_run.rs Normal file
View file

@ -0,0 +1,83 @@
use std::collections::HashMap;
pub struct MockJobRun {
sleep_ms: u64,
stdout_msg: String,
output_file: Option<OutputFile>,
exit_code: u8,
}
pub struct OutputFile {
path: String,
contents: String,
}
impl Default for MockJobRun {
fn default() -> Self {
Self {
sleep_ms: 0,
stdout_msg: "test executed".to_string(),
output_file: None,
exit_code: 0,
}
}
}
impl MockJobRun {
pub fn new() -> Self {
Self::default()
}
pub fn sleep_ms(mut self, val: u64) -> Self {
self.sleep_ms = val;
self
}
pub fn stdout_msg(mut self, val: &String) -> Self {
self.stdout_msg = val.into();
self
}
pub fn output_file(mut self, path: &String, contents: &String) -> Self {
self.output_file = Some(OutputFile {
path: path.to_string(),
contents: contents.to_string(),
});
self
}
pub fn exit_code(mut self, val: u8) -> Self {
self.exit_code = val;
self
}
pub fn to_env(&self) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert(
"DATABUILD_TEST_SLEEP_MS".to_string(),
self.sleep_ms.to_string(),
);
env.insert(
"DATABUILD_TEST_EXIT_CODE".to_string(),
self.exit_code.to_string(),
);
env.insert("DATABUILD_TEST_STDOUT".to_string(), self.stdout_msg.clone());
if let Some(output_file) = &self.output_file {
env.insert(
"DATABUILD_TEST_OUTPUT_FILE".to_string(),
output_file.path.clone(),
);
env.insert(
"DATABUILD_TEST_OUTPUT_CONTENTS".to_string(),
output_file.contents.clone(),
);
}
env
}
pub fn bin_path() -> String {
std::env::var("TEST_SRCDIR")
.map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir))
.unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string())
}
}

View file

@ -108,6 +108,12 @@ struct WantGroup {
wants: Vec<WantDetail>,
}
impl WantGroup {
pub fn spawn(&self) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
self.job.spawn(self.wants.clone())
}
}
#[derive(Debug, Clone)]
struct GroupedWants {
want_groups: Vec<WantGroup>,
@ -175,6 +181,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.flat_map(|wap| self.bel.state.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No wants found"))?;
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
dep_miss.state.missing_deps.clone(),
&dep_miss.job_run_id,
@ -183,6 +190,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
for event in want_events {
self.bel.append_event(&event)?;
}
// Record missing upstream status in want details
self.dep_miss_jobs.push(dep_miss);
}
}
@ -196,6 +204,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition,
let schedulability = self.bel.state.schedulable_wants();
println!("schedulability: {:?}", schedulability);
let schedulable_wants = schedulability
.0
.iter()
@ -205,6 +214,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
})
.collect();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
println!("grouped wants: {:?}", grouped_wants);
if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
@ -216,8 +226,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.into())
} else {
for wg in grouped_wants.want_groups {
let job_run = wg.job.spawn(wg.wants)?;
self.not_started_jobs.push(job_run);
self.not_started_jobs.push(wg.spawn()?);
}
Ok(())
@ -250,11 +259,16 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
/** Entrypoint for running jobs */
pub fn join(mut self) -> Result<(), Box<dyn Error>> {
loop {
fn step(&mut self) -> Result<(), Box<dyn Error>> {
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())
}
/** Entrypoint for running jobs */
pub fn join(&mut self) -> Result<(), Box<dyn Error>> {
loop {
self.step()?
}
}
}
@ -262,12 +276,38 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
mod tests {
use crate::build_event_log::MemoryBELStorage;
use crate::orchestrator::Orchestrator;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
Orchestrator::default()
}
/// Scenario 1
/// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with
/// alpha depending on a single output from beta, and beta with no deps.
fn setup_scenario_a_to_b(
mut orchestrator: Orchestrator<MemoryBELStorage>,
) -> Orchestrator<MemoryBELStorage> {
// Define test jobs
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
pattern: "data/alpha".to_string(),
entry_point: MockJobRun::bin_path(),
},
JobConfiguration {
label: "beta".to_string(),
pattern: "data/beta".to_string(),
entry_point: MockJobRun::bin_path(),
},
],
};
orchestrator
}
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
@ -304,7 +344,25 @@ mod tests {
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::orchestrator::tests::build_orchestrator;
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::util::current_timestamp;
use crate::WantCreateEventV1;
use uuid::Uuid;
impl WantCreateEventV1 {
fn sample() -> Self {
Self {
want_id: Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
source: None,
comment: Some("test want".to_string()),
}
}
}
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
@ -324,9 +382,39 @@ mod tests {
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
// (but in this case using a noop/mock child process)
#[test]
#[ignore]
fn test_schedulable_wants_should_schedule() {
todo!()
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
assert_eq!(orchestrator.not_started_jobs.len(), 0);
// When
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should schedule alpha job
assert_eq!(orchestrator.not_started_jobs.len(), 1);
assert_eq!(
orchestrator
.not_started_jobs
.iter()
.take(1)
.last()
.unwrap()
.state
.args,
vec!["data/alpha"],
"should have scheduled alpha job"
)
}
// Use case: A schedulable want that can't be matched to a job should return an error
@ -396,6 +484,7 @@ mod tests {
r#ref: r.to_string(),
})
.collect(),
upstreams: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,