diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 9f17590..79493fe 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -138,7 +138,7 @@ impl BuildState { */ pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { let live_details: Vec<&PartitionDetail> = want - .partitions + .upstreams .iter() .map(|pref| self.partitions.get(&pref.r#ref)) .flatten() @@ -148,7 +148,7 @@ impl BuildState { .map(|pd| pd.r#ref.clone().expect("pref must have ref")) .collect(); let missing: Vec = want - .partitions + .upstreams .iter() .filter(|pref| self.partitions.get(&pref.r#ref).is_none()) .cloned() @@ -188,6 +188,7 @@ impl BuildState { } } +/// The status of partitions required by a want to build (sensed from dep miss job run) #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct WantUpstreamStatus { pub live: Vec, @@ -201,6 +202,7 @@ pub struct WantSchedulability { pub status: WantUpstreamStatus, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct WantsSchedulability(pub Vec); impl Into for WantsSchedulability { @@ -211,7 +213,7 @@ impl Into for WantsSchedulability { impl WantSchedulability { pub fn is_schedulable(&self) -> bool { - self.status.missing.len() == 0 && self.status.tainted.len() == 0 + self.status.missing.is_empty() && self.status.tainted.is_empty() } } @@ -244,6 +246,9 @@ mod tests { fn with_partitions(self, partitions: Vec) -> Self { Self { partitions, ..self } } + fn with_upstreams(self, upstreams: Vec) -> Self { + Self { upstreams, ..self } + } fn with_status(self, status: Option) -> Self { Self { status, ..self } } @@ -293,7 +298,7 @@ mod tests { let state = BuildState::default().with_wants(BTreeMap::from([( test_partition.to_string(), WantDetail::default() - .with_partitions(vec![test_partition.into()]) + .with_upstreams(vec![test_partition.into()]) .with_status(Some(WantStatusCode::WantIdle.into())), )])); diff --git a/databuild/data_deps.rs b/databuild/data_deps.rs index 2053c81..66dcec8 100644 --- a/databuild/data_deps.rs +++ b/databuild/data_deps.rs @@ -1,21 +1,85 @@ -use uuid::Uuid; -use crate::{event_source, EventSource, JobRunMissingDeps, JobTriggeredEvent, MissingDeps, WantAttributedPartitions, WantCreateEventV1, WantDetail}; use crate::data_build_event::Event; -use crate::event_source::Source; +use crate::{ + JobRunMissingDeps, JobRunReadDeps, JobTriggeredEvent, MissingDeps, ReadDeps, WantCreateEventV1, + WantDetail, +}; +use uuid::Uuid; // TODO - how do we version this? -pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; +pub const DATABUILD_MISSING_DEPS_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; +pub const DATABUILD_DEP_READ_JSON: &str = "DATABUILD_DEP_READ_JSON:"; -pub fn parse_log_line(line: &str) -> Option { - line_matches(line).and_then(json_to_missing_deps) +pub enum DataDepLogLine { + DepMiss(JobRunMissingDeps), + DepRead(JobRunReadDeps), } -fn line_matches(line: &str) -> Option<&str> { - line.trim().strip_prefix(DATABUILD_JSON) +impl From for String { + fn from(value: DataDepLogLine) -> Self { + match value { + DataDepLogLine::DepMiss(dm) => { + format!( + "{}{}", + DATABUILD_MISSING_DEPS_JSON, + serde_json::to_string(&dm).expect("json serialize") + ) + } + DataDepLogLine::DepRead(dr) => { + format!( + "{}{}", + DATABUILD_DEP_READ_JSON, + serde_json::to_string(&dr).expect("json serialize") + ) + } + } + } } -fn json_to_missing_deps(line: &str) -> Option { - serde_json::from_str(line).ok() +#[derive(Default, Debug)] +pub struct JobRunDataDepResults { + pub reads: Vec, + pub misses: Vec, +} + +impl JobRunDataDepResults { + pub fn with(mut self, dep_log_line: DataDepLogLine) -> Self { + match dep_log_line { + DataDepLogLine::DepMiss(dm) => self.misses.extend(dm.missing_deps), + DataDepLogLine::DepRead(rd) => self.reads.extend(rd.read_deps), + } + self + } + + pub fn with_lines(mut self, lines: Vec) -> Self { + lines + .iter() + .flat_map(|line| parse_log_line(line)) + .fold(self, |agg, it| agg.with(it)) + } +} + +impl Into for Vec { + fn into(self) -> JobRunDataDepResults { + JobRunDataDepResults::default().with_lines(self) + } +} + +pub fn parse_log_line(line: &str) -> Option { + if let Some(message) = line_matches(line, DATABUILD_MISSING_DEPS_JSON) { + serde_json::from_str(message) + .ok() + .map(|dm| DataDepLogLine::DepMiss(dm)) + } else if let Some(message) = line_matches(line, DATABUILD_DEP_READ_JSON) { + serde_json::from_str(message) + .ok() + .map(|dm| DataDepLogLine::DepRead(dm)) + } else { + None + } +} + +fn line_matches<'a>(line: &'a str, prefix: &'a str) -> Option<&'a str> { + line.trim().strip_prefix(prefix) } pub struct WantTimestamps { @@ -50,19 +114,25 @@ pub fn missing_deps_to_want_events( job_run_id: &Uuid, want_timestamps: WantTimestamps, ) -> Vec { - missing_deps.iter().map(|md| { - Event::WantCreateV1(WantCreateEventV1 { - want_id: Uuid::new_v4().into(), - partitions: md.missing.clone(), - data_timestamp: want_timestamps.data_timestamp, - ttl_seconds: want_timestamps.ttl_seconds, - sla_seconds: want_timestamps.sla_seconds, - source: Some(JobTriggeredEvent { - job_run_id: job_run_id.to_string(), - }.into()), - comment: Some("Missing data".to_string()), + missing_deps + .iter() + .map(|md| { + Event::WantCreateV1(WantCreateEventV1 { + want_id: Uuid::new_v4().into(), + partitions: md.missing.clone(), + data_timestamp: want_timestamps.data_timestamp, + ttl_seconds: want_timestamps.ttl_seconds, + sla_seconds: want_timestamps.sla_seconds, + source: Some( + JobTriggeredEvent { + job_run_id: job_run_id.to_string(), + } + .into(), + ), + comment: Some("Missing data".to_string()), + }) }) - }).collect() + .collect() } #[cfg(test)] @@ -76,7 +146,10 @@ mod tests { let result = parse_log_line(&log_line); assert!(result.is_some()); - let missing_deps = result.unwrap(); + let missing_deps = match result.unwrap() { + DataDepLogLine::DepMiss(md) => md, + _ => panic!("expected dep miss log line"), + }; assert_eq!(missing_deps.missing_deps.len(), 2); // First entry: 1:1 (one missing input -> one impacted output) @@ -92,4 +165,79 @@ mod tests { assert_eq!(missing_deps.missing_deps[1].missing.len(), 1); assert_eq!(missing_deps.missing_deps[1].missing[0].r#ref, "input/p2"); } + + /// We can accumulate dep miss and read events + #[test] + fn test_accumulate_dep_parse_and_miss() { + // Given + let r = JobRunDataDepResults::default(); + assert_eq!(r.misses.len(), 0); + assert_eq!(r.reads.len(), 0); + + // When + let r = r + .with(DataDepLogLine::DepRead(JobRunReadDeps { + version: "1".into(), + read_deps: vec![ReadDeps { + impacted: vec!["output/p1".into()], + read: vec!["input/p1".into()], + }], + })) + .with(DataDepLogLine::DepRead(JobRunReadDeps { + version: "1".into(), + read_deps: vec![ReadDeps { + impacted: vec!["output/p2".into()], + read: vec!["input/p2".into(), "input/p2".into()], + }], + })) + .with(DataDepLogLine::DepMiss(JobRunMissingDeps { + version: "1".into(), + missing_deps: vec![MissingDeps { + impacted: vec!["output/p3".into()], + missing: vec!["input/p3".into()], + }], + })); + } + + /// It's acceptable to print separately for each missing dep + #[test] + fn test_parse_multiple_missing_deps() { + // Given + let r = JobRunDataDepResults::default(); + let stdout_lines: Vec = vec![ + "something".into(), + DataDepLogLine::DepRead(JobRunReadDeps { + version: "1".into(), + read_deps: vec![ReadDeps { + impacted: vec!["output/p1".into()], + read: vec!["input/p1".into()], + }], + }) + .into(), + DataDepLogLine::DepRead(JobRunReadDeps { + version: "1".into(), + read_deps: vec![ReadDeps { + impacted: vec!["output/p2".into()], + read: vec!["input/p2".into()], + }], + }) + .into(), + "something else".into(), + DataDepLogLine::DepMiss(JobRunMissingDeps { + version: "1".into(), + missing_deps: vec![MissingDeps { + impacted: vec!["output/p3".into()], + missing: vec!["input/p3".into()], + }], + }) + .into(), + ]; + + // When + let results = r.with_lines(stdout_lines); + + // Should + assert_eq!(results.misses.len(), 1); + assert_eq!(results.reads.len(), 2); + } } diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 8ef4c9b..a1a1c3b 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -85,6 +85,11 @@ message JobRunCancelEventV1 { message JobRunMissingDepsEventV1 { string job_run_id = 1; repeated MissingDeps missing_deps = 2; + repeated ReadDeps read_deps = 3; +} +message JobRunReadDepsEventV1 { + string job_run_id = 1; + repeated ReadDeps read_deps = 2; } message JobRunMissingDeps { string version = 1; @@ -95,6 +100,15 @@ message MissingDeps { repeated PartitionRef impacted = 1; repeated PartitionRef missing = 2; } +message JobRunReadDeps { + string version = 1; + repeated ReadDeps read_deps = 2; +} +message ReadDeps { + // The list of partition refs that are built using the read deps (can be just 1) + repeated PartitionRef impacted = 1; + repeated PartitionRef read = 2; +} message WantCreateEventV1 { @@ -152,14 +166,17 @@ enum WantStatusCode { message WantDetail { string want_id = 1; + // The partitions directly wanted by this want repeated PartitionRef partitions = 2; - uint64 data_timestamp = 3; - uint64 ttl_seconds = 4; - uint64 sla_seconds = 5; - EventSource source = 6; - optional string comment = 7; - WantStatus status = 8; - uint64 last_updated_timestamp = 9; + // The upstream partitions, detected from a dep miss job run failure + repeated PartitionRef upstreams = 3; + uint64 data_timestamp = 4; + uint64 ttl_seconds = 5; + uint64 sla_seconds = 6; + EventSource source = 7; + optional string comment = 8; + WantStatus status = 9; + uint64 last_updated_timestamp = 10; // TODO } diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 515875c..931673f 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,6 +1,10 @@ use crate::data_build_event::Event; use crate::util::current_timestamp; -use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{ + event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, + ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, + WantCreateEventV1, WantDetail, WantStatus, WantStatusCode, +}; impl From<&WantCreateEventV1> for WantDetail { fn from(e: &WantCreateEventV1) -> Self { @@ -12,12 +16,13 @@ impl From for WantDetail { WantDetail { want_id: e.want_id, partitions: e.partitions, + upstreams: vec![], data_timestamp: e.data_timestamp, ttl_seconds: e.ttl_seconds, sla_seconds: e.sla_seconds, source: e.source, comment: e.comment, - status: Default::default(), + status: Some(Default::default()), last_updated_timestamp: current_timestamp(), } } @@ -33,7 +38,6 @@ impl From for Event { } } - impl From for WantStatus { fn from(code: WantStatusCode) -> Self { WantStatus { @@ -71,12 +75,16 @@ impl From for JobRunStatus { impl From for EventSource { fn from(value: ManuallyTriggeredEvent) -> Self { - Self { source: Some(event_source::Source::ManuallyTriggered(value)) } + Self { + source: Some(event_source::Source::ManuallyTriggered(value)), + } } } impl From for EventSource { fn from(value: JobTriggeredEvent) -> Self { - Self { source: Some(event_source::Source::JobTriggered(value)) } + Self { + source: Some(event_source::Source::JobTriggered(value)), + } } } diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 3ce693a..c2c7441 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,6 +1,9 @@ use crate::data_build_event::Event; -use crate::data_deps::parse_log_line; -use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, JobRunSuccessEventV1, MissingDeps}; +use crate::data_deps::{parse_log_line, JobRunDataDepResults}; +use crate::{ + EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, + JobRunSuccessEventV1, MissingDeps, ReadDeps, +}; use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader}; @@ -183,6 +186,7 @@ pub struct SubProcessRunning { pub struct SubProcessCompleted { pub exit_code: i32, pub stdout_buffer: Vec, + pub read_deps: Vec, } /// Failed state for SubProcess backend @@ -201,6 +205,7 @@ pub struct SubProcessCanceled { pub struct SubProcessDepMiss { pub stdout_buffer: Vec, pub missing_deps: Vec, + pub read_deps: Vec, } impl JobRunBackend for SubProcessBackend { @@ -240,6 +245,7 @@ impl JobRunBackend for SubProcessBackend { > { // Non-blocking check for exit status if let Some(exit_status) = running.process.try_wait()? { + // Job has exited // Read any remaining stdout if let Some(stdout) = running.process.stdout.take() { let reader = BufReader::new(stdout); @@ -251,8 +257,9 @@ impl JobRunBackend for SubProcessBackend { } } - // Take ownership of stdout_buffer + // Take ownership of stdout_buffer, parse dep events let stdout_buffer = std::mem::take(&mut running.stdout_buffer); + let deps: JobRunDataDepResults = stdout_buffer.clone().into(); // Check exit status and return appropriate result match exit_status.code() { @@ -261,18 +268,13 @@ impl JobRunBackend for SubProcessBackend { Ok(PollResult::Completed(SubProcessCompleted { exit_code: 0, stdout_buffer, + read_deps: deps.reads, })) } Some(code) => { - let missing_deps = stdout_buffer.iter().flat_map(|s| parse_log_line(&s)); - // Failed with exit code - match missing_deps.last() { - Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss { - stdout_buffer, - missing_deps: misses.missing_deps, - })), - None => { + match deps.misses { + vec if vec.is_empty() => { // No missing deps, job failed let reason = format!("Job failed with exit code {}", code); Ok(PollResult::Failed(SubProcessFailed { @@ -281,6 +283,11 @@ impl JobRunBackend for SubProcessBackend { stdout_buffer, })) } + misses => Ok(PollResult::DepMiss(SubProcessDepMiss { + stdout_buffer, + missing_deps: misses, + read_deps: deps.reads, + })), } } None => { @@ -350,6 +357,7 @@ impl SubProcessDepMiss { Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 { job_run_id: job_run_id.to_string(), missing_deps: self.missing_deps.clone(), + read_deps: self.read_deps.clone(), }) } } @@ -362,22 +370,17 @@ pub struct JobRunPollResult { mod tests { use crate::data_build_event::Event; - use crate::data_deps::DATABUILD_JSON; + use crate::data_deps::DATABUILD_MISSING_DEPS_JSON; use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend}; + use crate::mock_job_run::MockJobRun; use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps}; use std::collections::HashMap; - fn test_helper_path() -> String { - std::env::var("TEST_SRCDIR") - .map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir)) - .unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string()) - } - /// Happy path - run that succeeds should emit a JobRunSuccessEventV1 #[test] fn test_job_run_success_returns_job_run_success_event() { // Spawn a job run that will succeed (exit code 0) - let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); + let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); // Start the job - this consumes the NotStarted and returns Running let mut running_job = job_run.run().unwrap(); @@ -410,11 +413,10 @@ mod tests { #[test] fn test_job_run_failure_returns_job_run_failure_event() { // Spawn a job run - let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); + let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); // Start the job with an exit code that indicates failure (non-zero) - let env: HashMap = - HashMap::from([("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())]); + let env = MockJobRun::new().exit_code(1).to_env(); let mut running_job = job_run.run_with_env(Some(env)).unwrap(); // Poll until we get completion @@ -454,16 +456,13 @@ mod tests { let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); // Spawn a job run that will sleep for 1 second and write a file - let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); + let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); - let env: HashMap = HashMap::from([ - ("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()), - ("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()), - ( - "DATABUILD_TEST_FILE_CONTENT".to_string(), - "completed".to_string(), - ), - ]); + let env = MockJobRun::new() + .sleep_ms(1000) + .output_file(&temp_file, &"completed".to_string()) + .exit_code(0) + .to_env(); let running_job = job_run.run_with_env(Some(env)).unwrap(); // Give it a tiny bit of time to start @@ -501,7 +500,7 @@ mod tests { #[test] fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() { // Spawn a job run that will sleep for 1 second and write a file - let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); + let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); let expected_dep_miss = JobRunMissingDeps { version: "1".into(), @@ -512,11 +511,11 @@ mod tests { }; let dep_miss_json = serde_json::to_string(&expected_dep_miss).expect("Failed to serialize dep miss"); - let dep_miss_line = format!("{}{}", DATABUILD_JSON, dep_miss_json); - let env: HashMap = HashMap::from([ - ("DATABUILD_TEST_STDOUT".to_string(), dep_miss_line), - ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()), - ]); + let dep_miss_line = format!("{}{}", DATABUILD_MISSING_DEPS_JSON, dep_miss_json); + let env = MockJobRun::new() + .stdout_msg(&dep_miss_line) + .exit_code(1) + .to_env(); let mut running_job = job_run.run_with_env(Some(env)).unwrap(); // Poll until we get completion diff --git a/databuild/lib.rs b/databuild/lib.rs index 6639e42..19ef707 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -7,6 +7,7 @@ mod build_state; mod event_transforms; mod event_defaults; mod data_deps; +mod mock_job_run; // Include generated protobuf code include!("databuild.rs"); diff --git a/databuild/mock_job_run.rs b/databuild/mock_job_run.rs new file mode 100644 index 0000000..9f4e3d8 --- /dev/null +++ b/databuild/mock_job_run.rs @@ -0,0 +1,83 @@ +use std::collections::HashMap; + +pub struct MockJobRun { + sleep_ms: u64, + stdout_msg: String, + output_file: Option, + exit_code: u8, +} + +pub struct OutputFile { + path: String, + contents: String, +} + +impl Default for MockJobRun { + fn default() -> Self { + Self { + sleep_ms: 0, + stdout_msg: "test executed".to_string(), + output_file: None, + exit_code: 0, + } + } +} + +impl MockJobRun { + pub fn new() -> Self { + Self::default() + } + + pub fn sleep_ms(mut self, val: u64) -> Self { + self.sleep_ms = val; + self + } + + pub fn stdout_msg(mut self, val: &String) -> Self { + self.stdout_msg = val.into(); + self + } + + pub fn output_file(mut self, path: &String, contents: &String) -> Self { + self.output_file = Some(OutputFile { + path: path.to_string(), + contents: contents.to_string(), + }); + self + } + + pub fn exit_code(mut self, val: u8) -> Self { + self.exit_code = val; + self + } + + pub fn to_env(&self) -> HashMap { + let mut env = HashMap::new(); + env.insert( + "DATABUILD_TEST_SLEEP_MS".to_string(), + self.sleep_ms.to_string(), + ); + env.insert( + "DATABUILD_TEST_EXIT_CODE".to_string(), + self.exit_code.to_string(), + ); + env.insert("DATABUILD_TEST_STDOUT".to_string(), self.stdout_msg.clone()); + if let Some(output_file) = &self.output_file { + env.insert( + "DATABUILD_TEST_OUTPUT_FILE".to_string(), + output_file.path.clone(), + ); + env.insert( + "DATABUILD_TEST_OUTPUT_CONTENTS".to_string(), + output_file.contents.clone(), + ); + } + env + } + + pub fn bin_path() -> String { + std::env::var("TEST_SRCDIR") + .map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir)) + .unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string()) + } +} diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 7e5287c..18b98ba 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -108,6 +108,12 @@ struct WantGroup { wants: Vec, } +impl WantGroup { + pub fn spawn(&self) -> Result, std::io::Error> { + self.job.spawn(self.wants.clone()) + } +} + #[derive(Debug, Clone)] struct GroupedWants { want_groups: Vec, @@ -175,6 +181,7 @@ impl Orchestrator { .flat_map(|wap| self.bel.state.get_want(&wap.want_id).map(|w| w.into())) .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) .ok_or(format!("No wants found"))?; + // Create wants from dep misses let want_events = missing_deps_to_want_events( dep_miss.state.missing_deps.clone(), &dep_miss.job_run_id, @@ -183,6 +190,7 @@ impl Orchestrator { for event in want_events { self.bel.append_event(&event)?; } + // Record missing upstream status in want details self.dep_miss_jobs.push(dep_miss); } } @@ -196,6 +204,7 @@ impl Orchestrator { fn poll_wants(&mut self) -> Result<(), Box> { // Collect unhandled wants, group by job that handles each partition, let schedulability = self.bel.state.schedulable_wants(); + println!("schedulability: {:?}", schedulability); let schedulable_wants = schedulability .0 .iter() @@ -205,6 +214,7 @@ impl Orchestrator { }) .collect(); let grouped_wants = Orchestrator::::group_wants(&self.config, &schedulable_wants); + println!("grouped wants: {:?}", grouped_wants); if !grouped_wants.unhandled_wants.is_empty() { // All wants must be mapped to jobs that can be handled @@ -216,8 +226,7 @@ impl Orchestrator { .into()) } else { for wg in grouped_wants.want_groups { - let job_run = wg.job.spawn(wg.wants)?; - self.not_started_jobs.push(job_run); + self.not_started_jobs.push(wg.spawn()?); } Ok(()) @@ -250,11 +259,16 @@ impl Orchestrator { } } + fn step(&mut self) -> Result<(), Box> { + self.poll_job_runs()?; + self.poll_wants()?; + Ok(()) + } + /** Entrypoint for running jobs */ - pub fn join(mut self) -> Result<(), Box> { + pub fn join(&mut self) -> Result<(), Box> { loop { - self.poll_job_runs()?; - self.poll_wants()?; + self.step()? } } } @@ -262,12 +276,38 @@ impl Orchestrator { #[cfg(test)] mod tests { use crate::build_event_log::MemoryBELStorage; - use crate::orchestrator::Orchestrator; + use crate::job::JobConfiguration; + use crate::mock_job_run::MockJobRun; + use crate::orchestrator::{Orchestrator, OrchestratorConfig}; fn build_orchestrator() -> Orchestrator { Orchestrator::default() } + /// Scenario 1 + /// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with + /// alpha depending on a single output from beta, and beta with no deps. + fn setup_scenario_a_to_b( + mut orchestrator: Orchestrator, + ) -> Orchestrator { + // Define test jobs + orchestrator.config = OrchestratorConfig { + jobs: vec![ + JobConfiguration { + label: "alpha".to_string(), + pattern: "data/alpha".to_string(), + entry_point: MockJobRun::bin_path(), + }, + JobConfiguration { + label: "beta".to_string(), + pattern: "data/beta".to_string(), + entry_point: MockJobRun::bin_path(), + }, + ], + }; + orchestrator + } + // The orchestrator needs to be able to actually execute job runs mod run_jobs { // Use case: the orchestrator should be able to execute a spawned-process job @@ -304,7 +344,25 @@ mod tests { // The orchestrator polls wants so that it can react to new wants created by users, or to wants // created by itself (for dep miss job run failures) mod poll_wants { - use crate::orchestrator::tests::build_orchestrator; + use crate::data_build_event::Event; + use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; + use crate::util::current_timestamp; + use crate::WantCreateEventV1; + use uuid::Uuid; + + impl WantCreateEventV1 { + fn sample() -> Self { + Self { + want_id: Uuid::new_v4().to_string(), + partitions: vec![], + data_timestamp: current_timestamp(), + ttl_seconds: 1000, + sla_seconds: 1000, + source: None, + comment: Some("test want".to_string()), + } + } + } // Use case: Empty schedulable wants is a valid case, and should create no new jobs. #[test] @@ -324,9 +382,39 @@ mod tests { // Use case: Some schedulable wants with jobs that can be matched should launch those jobs // (but in this case using a noop/mock child process) #[test] - #[ignore] fn test_schedulable_wants_should_schedule() { - todo!() + // Given + let mut orchestrator = setup_scenario_a_to_b(build_orchestrator()); + let events = vec![Event::WantCreateV1(WantCreateEventV1 { + partitions: vec!["data/alpha".into()], + ..WantCreateEventV1::sample() + })]; + assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0); + for e in events { + orchestrator.bel.append_event(&e).expect("append"); + } + assert_eq!(orchestrator.not_started_jobs.len(), 0); + + // When + assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1); + orchestrator + .poll_wants() + .expect("shouldn't fail to poll wants"); + + // Should schedule alpha job + assert_eq!(orchestrator.not_started_jobs.len(), 1); + assert_eq!( + orchestrator + .not_started_jobs + .iter() + .take(1) + .last() + .unwrap() + .state + .args, + vec!["data/alpha"], + "should have scheduled alpha job" + ) } // Use case: A schedulable want that can't be matched to a job should return an error @@ -396,6 +484,7 @@ mod tests { r#ref: r.to_string(), }) .collect(), + upstreams: vec![], data_timestamp: 0, ttl_seconds: 0, sla_seconds: 0,