From 5361e295e052b267689f517e6624ea595369206f Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 20 Oct 2025 09:04:22 -0700 Subject: [PATCH] impl test_job_fail_want_mapping --- databuild/build_state.rs | 8 +- databuild/job_run.rs | 5 +- databuild/orchestrator.rs | 185 ++++++++++++++++++++++++++++---------- 3 files changed, 142 insertions(+), 56 deletions(-) diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 79493fe..d0f9653 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -35,10 +35,10 @@ and updates, which is exceptionally fast. #[derive(Debug, Clone)] pub struct BuildState { - wants: BTreeMap, - taints: BTreeMap, - partitions: BTreeMap, - job_runs: BTreeMap, + pub wants: BTreeMap, + pub taints: BTreeMap, + pub partitions: BTreeMap, + pub job_runs: BTreeMap, } impl Default for BuildState { diff --git a/databuild/job_run.rs b/databuild/job_run.rs index c2c7441..a185ab2 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,5 +1,5 @@ use crate::data_build_event::Event; -use crate::data_deps::{parse_log_line, JobRunDataDepResults}; +use crate::data_deps::JobRunDataDepResults; use crate::{ EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, JobRunSuccessEventV1, MissingDeps, ReadDeps, @@ -63,7 +63,7 @@ pub enum PollResult { pub struct JobRun { pub job_run_id: Uuid, pub state: S, - _backend: PhantomData, + pub _backend: PhantomData, } /// Type aliases for specific states @@ -374,7 +374,6 @@ mod tests { use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend}; use crate::mock_job_run::MockJobRun; use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps}; - use std::collections::HashMap; /// Happy path - run that succeeds should emit a JobRunSuccessEventV1 #[test] diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 0df9fb3..36b3a05 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,4 +1,5 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; +use crate::build_state::BuildState; use crate::data_build_event::Event; use crate::data_deps::{missing_deps_to_want_events, WantTimestamps}; use crate::job::JobConfiguration; @@ -163,31 +164,7 @@ impl Orchestrator { self.failed_jobs.push(failed); } JobRunVisitResult::DepMiss(dep_miss) => { - // Append literal dep miss - let event: Event = dep_miss.state.to_event(&dep_miss.id()); - self.bel.append_event(&event)?; - // Append wants from dep miss - let job_run_detail = self - .bel - .state - .get_job_run(&dep_miss.job_run_id.to_string()) - .ok_or(format!( - "Unable to find job run with id `{}`", - dep_miss.job_run_id - ))?; - let want_timestamps: WantTimestamps = job_run_detail - .servicing_wants - .iter() - .flat_map(|wap| self.bel.state.get_want(&wap.want_id).map(|w| w.into())) - .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) - .ok_or(format!("No wants found"))?; - // Create wants from dep misses - let want_events = missing_deps_to_want_events( - dep_miss.state.missing_deps.clone(), - &dep_miss.job_run_id, - want_timestamps, - ); - for event in want_events { + for event in dep_miss_to_events(&self.bel.state, &dep_miss)? { self.bel.append_event(&event)?; } // Record missing upstream status in want details @@ -273,17 +250,67 @@ impl Orchestrator { } } +fn dep_miss_to_events( + bel_state: &BuildState, + dep_miss: &DepMissJobRun, +) -> Result, Box> { + let mut events = vec![]; + // Append literal job run dep miss + events.push(dep_miss.state.to_event(&dep_miss.id())); + // Append wants from dep miss + let job_run_detail = bel_state + .get_job_run(&dep_miss.job_run_id.to_string()) + .ok_or(format!( + "Unable to find job run with id `{}`", + dep_miss.job_run_id + ))?; + // Infer data/SLA timestamps from upstream want + let want_timestamps: WantTimestamps = job_run_detail + .servicing_wants + .iter() + .flat_map(|wap| bel_state.get_want(&wap.want_id).map(|w| w.into())) + .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) + .ok_or(format!("No servicing wants found"))?; + // Create wants from dep misses + let want_events = missing_deps_to_want_events( + dep_miss.state.missing_deps.clone(), + &dep_miss.job_run_id, + want_timestamps, + ); + events.extend(want_events); + + Ok(events) +} + #[cfg(test)] mod tests { use crate::build_event_log::MemoryBELStorage; use crate::job::JobConfiguration; use crate::mock_job_run::MockJobRun; use crate::orchestrator::{Orchestrator, OrchestratorConfig}; + use crate::util::current_timestamp; + use crate::WantCreateEventV1; + use uuid::Uuid; fn build_orchestrator() -> Orchestrator { Orchestrator::default() } + impl WantCreateEventV1 { + /// A naive random want for testing purposes + pub fn sample() -> Self { + Self { + want_id: Uuid::new_v4().to_string(), + partitions: vec![], + data_timestamp: current_timestamp(), + ttl_seconds: 1000, + sla_seconds: 1000, + source: None, + comment: Some("test want".to_string()), + } + } + } + /// Scenario 1 /// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with /// alpha depending on a single output from beta, and beta with no deps. @@ -346,23 +373,7 @@ mod tests { mod poll_wants { use crate::data_build_event::Event; use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; - use crate::util::current_timestamp; use crate::WantCreateEventV1; - use uuid::Uuid; - - impl WantCreateEventV1 { - fn sample() -> Self { - Self { - want_id: Uuid::new_v4().to_string(), - partitions: vec![], - data_timestamp: current_timestamp(), - ttl_seconds: 1000, - sla_seconds: 1000, - source: None, - comment: Some("test want".to_string()), - } - } - } // Use case: Empty schedulable wants is a valid case, and should create no new jobs. #[test] @@ -437,21 +448,97 @@ mod tests { } } - // Orchestrator want creation is the means of data dependency propagation, allowing the - // orchestrator to create partitions needed by jobs that produce the existing wanted partitions. + /// Orchestrator want creation is the means of data dependency propagation, allowing the + /// orchestrator to create partitions needed by jobs that produce the existing wanted partitions. mod want_create { - // Use case: The orchestrator should map a failed job into a set of wants + use crate::data_build_event::Event; + use crate::job_run::{DepMissJobRun, SubProcessDepMiss}; + use crate::orchestrator::dep_miss_to_events; + use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; + use crate::{JobRunDetail, MissingDeps, WantAttributedPartitions, WantCreateEventV1}; + use std::marker::PhantomData; + use uuid::Uuid; + + /// Use case: The orchestrator should map a failed job into a set of wants #[test] - #[ignore] fn test_job_fail_want_mapping() { - todo!() + // Given a + let mut orchestrator = setup_scenario_a_to_b(build_orchestrator()); + // Add event for originating want + orchestrator + .bel + .append_event(&Event::WantCreateV1(WantCreateEventV1 { + partitions: vec!["data/beta".into()], + ..WantCreateEventV1::sample() + })) + .expect("event append"); + // Create failed job run detail + let job_run_id = Uuid::new_v4(); + let job_run = JobRunDetail { + servicing_wants: orchestrator + .bel + .state + .wants + .values() + .map(|w| WantAttributedPartitions { + want_id: w.want_id.clone(), + partitions: w.partitions.clone(), + }) + .collect(), + ..JobRunDetail::default() + }; + orchestrator + .bel + .state + .job_runs + .insert(job_run_id.into(), job_run); + // Add event for job failure + let dep_miss_job_run = DepMissJobRun { + job_run_id, + state: SubProcessDepMiss { + stdout_buffer: vec![], + missing_deps: vec![MissingDeps { + impacted: vec!["data/beta".into()], + missing: vec!["data/alpha".into()], + }], + read_deps: vec![], + }, + _backend: PhantomData, + }; + + // When calculating events from dep miss + let events = dep_miss_to_events(&orchestrator.bel.state, &dep_miss_job_run).unwrap(); + + // Should have scheduled a job for alpha + assert_eq!( + events + .iter() + .filter(|e| match e { + Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()), + _ => false, + }) + .count(), + 1 + ); + assert!( + orchestrator.not_started_jobs.is_empty(), + "shouldn't have scheduled yet" + ); + + // Should schedule job after we poll wants + orchestrator.poll_wants().expect("poll wants"); + assert_eq!( + orchestrator.not_started_jobs.len(), + 1, + "should have scheduled job" + ); } } - // Orchestrator needs to be able to achieve high level orchestration use cases. + /// Orchestrator needs to be able to achieve high level orchestration use cases. mod orchestration { - // Use case: should run a job to produce a partition in reaction to a want, then have the - // want fulfilled. + /// Use case: should run a job to produce a partition in reaction to a want, then have the + /// want fulfilled. #[test] #[ignore] fn test_want_builds_partition() {