diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index b3ebc13..b3edb80 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,12 +1,11 @@ +use crate::build_state::BuildState; use crate::data_build_event::Event; -use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail}; +use crate::util::{current_timestamp, DatabuildError}; +use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1}; use prost::Message; use rusqlite::Connection; -use std::error::Error; use std::fmt::Debug; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::build_state::BuildState; -use crate::util::{current_timestamp, DatabuildError}; pub trait BELStorage { fn append_event(&mut self, event: &Event) -> Result; @@ -237,11 +236,11 @@ impl Clone for BuildEventLog { mod tests { mod sqlite_bel_storage { - use uuid::Uuid; use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage}; + use crate::build_state::BuildState; use crate::data_build_event::Event; use crate::{PartitionRef, WantCreateEventV1}; - use crate::build_state::BuildState; + use uuid::Uuid; #[test] fn test_sqlite_append_event() { diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 576b3fa..4e7701e 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,12 +1,16 @@ use crate::data_build_event::Event; -use crate::data_deps::{missing_deps_to_want_events, WantTimestamps}; -use crate::util::{current_timestamp, DatabuildError}; -use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode}; -use rusqlite::types::FromSql; -use rusqlite::ToSql; +use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::util::{DatabuildError, current_timestamp}; +use crate::{ + JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, + JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, + ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, + ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, + PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, + WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode, +}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use std::error::Error; /** Design Notes @@ -75,22 +79,31 @@ impl BuildState { } } - fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result, DatabuildError> { + fn handle_want_create( + &mut self, + event: &WantCreateEventV1, + ) -> Result, DatabuildError> { self.wants .insert(event.want_id.clone(), event.clone().into()); - Ok(vec!()) + Ok(vec![]) } - fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result, DatabuildError> { + fn handle_want_cancel( + &mut self, + event: &WantCancelEventV1, + ) -> Result, DatabuildError> { // TODO actually cancel in-progress job runs that no longer have a sponsoring want if let Some(want) = self.wants.get_mut(&event.want_id) { want.status = Some(WantStatusCode::WantCanceled.into()); want.last_updated_timestamp = current_timestamp(); } - Ok(vec!()) + Ok(vec![]) } - fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result, DatabuildError> { + fn handle_job_run_buffer( + &mut self, + event: &JobRunBufferEventV1, + ) -> Result, DatabuildError> { // No job run should exist if self.job_runs.get(&event.job_run_id).is_some() { Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into()) @@ -98,18 +111,18 @@ impl BuildState { // Create job run to be inserted let job_run: JobRunDetail = event.clone().into(); - for pref in job_run.building_partitions.iter() { - // Update all wants that point to this partition ref to `Building` - // Query notes: "update all wants that point to this partition to building" - if let Some(want) = self.wants.get_mut(&pref.r#ref) { + // Mark all servicing wants as WantBuilding + for wap in &job_run.servicing_wants { + if let Some(want) = self.wants.get_mut(&wap.want_id) { want.status = Some(WantStatusCode::WantBuilding.into()); + want.last_updated_timestamp = current_timestamp(); } } self.job_runs .insert(event.job_run_id.clone(), job_run.clone()); println!("Inserted job run: {:?}", job_run); - Ok(vec!()) + Ok(vec![]) } } @@ -196,7 +209,7 @@ impl BuildState { event: &JobRunHeartbeatEventV1, ) -> Result, DatabuildError> { self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?; - Ok(vec!()) + Ok(vec![]) } fn handle_job_run_success( @@ -206,15 +219,58 @@ impl BuildState { println!("Job run success event: {:?}", event); self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?; let job_run = self.get_job_run(&event.job_run_id).unwrap(); + + // Clone building_partitions before we use it multiple times + let newly_live_partitions: Vec = job_run.building_partitions.clone(); + // Update partitions being build by this job - for pref in job_run.building_partitions { + for pref in &newly_live_partitions { self.update_partition_status( - &pref, + pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id), )?; } - Ok(vec!()) + + // Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied + let wants_to_update: Vec = self + .wants + .iter() + .filter(|(_, want)| { + want.status.as_ref().map(|s| s.code) + == Some(WantStatusCode::WantUpstreamBuilding as i32) + }) + .filter(|(_, want)| { + // Check if this want was waiting for any of the newly live partitions + want.upstreams.iter().any(|upstream| { + newly_live_partitions + .iter() + .any(|p| p.r#ref == upstream.r#ref) + }) + }) + .map(|(want_id, _)| want_id.clone()) + .collect(); + + for want_id in wants_to_update { + if let Some(want) = self.wants.get_mut(&want_id) { + // Check if all upstreams are now satisfied + let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| { + self.partitions + .get(&upstream.r#ref) + .and_then(|p| p.status.as_ref()) + .map(|s| s.code == PartitionStatusCode::PartitionLive as i32) + .unwrap_or(false) + }); + + if all_upstreams_satisfied { + // Transition back to WantIdle so it can be rescheduled + want.status = Some(WantStatusCode::WantIdle.into()); + want.last_updated_timestamp = current_timestamp(); + } + } + } + + Ok(vec![]) } fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> { @@ -236,17 +292,50 @@ impl BuildState { ) -> Result, DatabuildError> { self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?; let job_run = self.get_job_run(&event.job_run_id).unwrap(); - for pref in job_run.building_partitions { + + // Clone building_partitions before we use it multiple times + let failed_partitions: Vec = job_run.building_partitions.clone(); + + for pref in &failed_partitions { self.update_partition_status( - &pref, + pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id), )?; } - Ok(vec!()) + + // Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions + let wants_to_fail: Vec = self + .wants + .iter() + .filter(|(_, want)| { + want.status.as_ref().map(|s| s.code) + == Some(WantStatusCode::WantUpstreamBuilding as i32) + }) + .filter(|(_, want)| { + // Check if this want was waiting for any of the failed partitions + want.upstreams + .iter() + .any(|upstream| failed_partitions.iter().any(|p| p.r#ref == upstream.r#ref)) + }) + .map(|(want_id, _)| want_id.clone()) + .collect(); + + for want_id in wants_to_fail { + if let Some(want) = self.wants.get_mut(&want_id) { + // Transition to WantUpstreamFailed since a dependency failed + want.status = Some(WantStatusCode::WantUpstreamFailed.into()); + want.last_updated_timestamp = current_timestamp(); + } + } + + Ok(vec![]) } - fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result, DatabuildError> { + fn handle_job_run_cancel( + &mut self, + event: &JobRunCancelEventV1, + ) -> Result, DatabuildError> { todo!("should update already inserted job run, partition status, want status") } @@ -254,12 +343,10 @@ impl BuildState { &mut self, event: &JobRunMissingDepsEventV1, ) -> Result, DatabuildError> { - let job_run_detail = self - .get_job_run(&event.job_run_id) - .ok_or(format!( - "Unable to find job run with id `{}`", - event.job_run_id - ))?; + let job_run_detail = self.get_job_run(&event.job_run_id).ok_or(format!( + "Unable to find job run with id `{}`", + event.job_run_id + ))?; // Infer data/SLA timestamps from upstream want let want_timestamps: WantTimestamps = job_run_detail .servicing_wants @@ -267,6 +354,40 @@ impl BuildState { .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) .ok_or(format!("No servicing wants found"))?; + + // Update servicing wants to track missing dependencies as upstreams + for servicing_want in &job_run_detail.servicing_wants { + if let Some(want) = self.wants.get_mut(&servicing_want.want_id) { + let mut want_is_impacted = false; + for missing_dep in &event.missing_deps { + // Only update this want if it contains an impacted partition + let impacted = missing_dep + .impacted + .iter() + .any(|impacted| want.partitions.iter().any(|p| p.r#ref == impacted.r#ref)); + + if impacted { + want_is_impacted = true; + // Add missing partitions to upstreams + for missing_partition in &missing_dep.missing { + want.upstreams.push(missing_partition.clone()); + } + } + } + + // Dedupe upstreams + let mut seen = std::collections::HashSet::new(); + want.upstreams.retain(|p| seen.insert(p.r#ref.clone())); + + // Set impacted wants to WantUpstreamBuilding so they won't be rescheduled + // until their dependencies are ready + if want_is_impacted { + want.status = Some(WantStatusCode::WantUpstreamBuilding.into()); + want.last_updated_timestamp = current_timestamp(); + } + } + } + // Create wants from dep misses let want_events = missing_deps_to_want_events( event.missing_deps.clone(), @@ -277,11 +398,17 @@ impl BuildState { Ok(want_events) } - fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result, DatabuildError> { + fn handle_taint_create( + &mut self, + event: &TaintCreateEventV1, + ) -> Result, DatabuildError> { todo!("...?") } - fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result, DatabuildError> { + fn handle_taint_delete( + &mut self, + event: &TaintCancelEventV1, + ) -> Result, DatabuildError> { todo!("...?") } @@ -389,14 +516,12 @@ impl BuildState { WantsSchedulability( self.wants .values() - // Do not consider fulfilled or currently building wants in schedulability query + // Only consider idle wants for schedulability - all other states are either + // terminal (successful/failed/canceled/upstream failed) or waiting for an event + // (building/upstream building) .filter(|w| { w.status.clone().expect("want must have status").code - != WantStatusCode::WantSuccessful as i32 - }) - .filter(|w| { - w.status.clone().expect("want must have status").code - != WantStatusCode::WantBuilding as i32 + == WantStatusCode::WantIdle as i32 }) .cloned() .map(|w| self.want_schedulability(&w)) @@ -531,6 +656,105 @@ mod tests { assert!(!ws.is_schedulable()); } + #[test] + fn test_want_not_schedulable_after_dep_miss_until_deps_exist() { + use crate::{ + JobRunDetail, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions, + }; + use std::collections::BTreeMap; + + // Given: A want with a job run that had a dep miss + let beta_partition = "data/beta"; + let alpha_partition = "data/alpha"; + let want_id = "beta_want"; + let job_run_id = "job_123"; + + let mut state = BuildState::default().with_wants(BTreeMap::from([( + want_id.to_string(), + WantDetail::default() + .with_partitions(vec![beta_partition.into()]) + .with_status(Some(WantStatusCode::WantIdle.into())), + )])); + + // Job run exists for this want + state.job_runs.insert( + job_run_id.to_string(), + JobRunDetail { + id: job_run_id.to_string(), + servicing_wants: vec![WantAttributedPartitions { + want_id: want_id.to_string(), + partitions: vec![beta_partition.into()], + }], + ..Default::default() + }, + ); + + // Initially, want should be schedulable (no known upstreams) + let schedulability = state.wants_schedulability(); + assert_eq!(schedulability.0.len(), 1); + assert!( + schedulability.0[0].is_schedulable(), + "want should be schedulable before dep miss" + ); + + // When: Job run fails with dep miss indicating it needs alpha + let dep_miss_event = JobRunMissingDepsEventV1 { + job_run_id: job_run_id.to_string(), + missing_deps: vec![MissingDeps { + impacted: vec![beta_partition.into()], + missing: vec![alpha_partition.into()], + }], + read_deps: vec![], + }; + + let new_events = state.handle_job_run_dep_miss(&dep_miss_event).unwrap(); + for event in new_events { + state.handle_event(&event).unwrap(); + } + + // Then: Beta want should be in WantUpstreamBuilding status and not in schedulability list + let schedulability = state.wants_schedulability(); + + // The schedulability list should contain the newly created alpha want, but not the beta want + let has_beta_want = schedulability + .0 + .iter() + .any(|ws| ws.want.partitions.iter().any(|p| p.r#ref == beta_partition)); + assert!( + !has_beta_want, + "beta want should not appear in schedulability list when in WantUpstreamBuilding status" + ); + + // The alpha want should be schedulable + let has_alpha_want = schedulability.0.iter().any(|ws| { + ws.want + .partitions + .iter() + .any(|p| p.r#ref == alpha_partition) + }); + assert!( + has_alpha_want, + "alpha want should be schedulable (newly created from dep miss)" + ); + + // Verify the beta want is now in WantUpstreamBuilding status + let beta_want = state.wants.get(want_id).expect("beta want should exist"); + assert_eq!( + beta_want.status.as_ref().unwrap().code, + WantStatusCode::WantUpstreamBuilding as i32, + "want should be in WantUpstreamBuilding status after dep miss" + ); + assert_eq!( + beta_want.upstreams.len(), + 1, + "want should have one upstream" + ); + assert_eq!( + beta_want.upstreams[0].r#ref, alpha_partition, + "upstream should be alpha" + ); + } + #[test] #[ignore] fn test_simple_want_with_tainted_upstream_is_not_schedulable() { @@ -569,7 +793,9 @@ mod tests { e.partitions = vec!["mypart".into()]; let mut state = BuildState::default(); - state.handle_event(&e.clone().into()); + state + .handle_event(&e.clone().into()) + .expect("want create failed"); let want = state.get_want("1234").unwrap(); let mut expected: WantDetail = e.into(); // Into will set this field as current timestamp @@ -584,12 +810,16 @@ mod tests { e.partitions = vec!["mypart".into()]; let mut state = BuildState::default(); - state.handle_event(&e.clone().into()); + state + .handle_event(&e.clone().into()) + .expect("want create failed"); // Should be able to cancel let mut e = WantCancelEventV1::default(); e.want_id = "1234".to_string(); - state.handle_event(&e.clone().into()); + state + .handle_event(&e.clone().into()) + .expect("want cancel failed"); let want = state.get_want("1234").unwrap(); assert_eq!( diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 17cdc92..62f87d7 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -181,6 +181,8 @@ enum WantStatusCode { WantFailed = 2; WantSuccessful = 3; WantCanceled = 4; + WantUpstreamBuilding = 5; + WantUpstreamFailed = 6; } message WantDetail { diff --git a/databuild/event_defaults.rs b/databuild/event_defaults.rs deleted file mode 100644 index 0104b87..0000000 --- a/databuild/event_defaults.rs +++ /dev/null @@ -1,2 +0,0 @@ -use uuid::Uuid; -use crate::{PartitionRef, WantCreateEventV1}; diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 0294b44..9a96146 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,12 +1,11 @@ use crate::data_build_event::Event; use crate::data_deps::JobRunDataDepResults; +use crate::util::DatabuildError; use crate::{ EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, JobRunSuccessEventV1, MissingDeps, ReadDeps, }; -use crate::util::DatabuildError; use std::collections::HashMap; -use std::error::Error; use std::io::{BufRead, BufReader}; use std::marker::PhantomData; use std::process::{Child, Command, Stdio}; diff --git a/databuild/lib.rs b/databuild/lib.rs index 19ef707..0479561 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -5,7 +5,6 @@ mod job; mod util; mod build_state; mod event_transforms; -mod event_defaults; mod data_deps; mod mock_job_run; diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 9e030e9..006f3d5 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,12 +1,13 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; -use crate::build_state::BuildState; use crate::data_build_event::Event; use crate::job::JobConfiguration; -use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend}; +use crate::job_run::{ + CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, + RunningJobRun, SubProcessBackend, +}; use crate::util::DatabuildError; use crate::{JobRunBufferEventV1, PartitionRef, WantDetail}; use std::collections::HashMap; -use std::error::Error; use std::fmt::Debug; /** @@ -176,7 +177,8 @@ impl Orchestrator { JobRunVisitResult::Completed(completed) => { // Emit success event println!("Completed job: {:?}", completed.id()); - self.bel.append_event(&completed.state.to_event(&completed.id()))?; + self.bel + .append_event(&completed.state.to_event(&completed.id()))?; self.completed_jobs.push(completed); } JobRunVisitResult::Failed(failed) => { @@ -447,12 +449,6 @@ mod tests { /// Orchestrator want creation is the means of data dependency propagation, allowing the /// orchestrator to create partitions needed by jobs that produce the existing wanted partitions. mod want_create { - use crate::data_build_event::Event; - use crate::job_run::{DepMissJobRun, SubProcessDepMiss}; - use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; - use crate::{JobRunBufferEventV1, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1}; - use std::marker::PhantomData; - use uuid::Uuid; // /// Use case: The orchestrator should map a failed job into a set of wants // #[test] @@ -609,17 +605,86 @@ mod tests { ); } + /// Helper to wait for running jobs to complete with timeout + fn wait_for_jobs_to_complete( + orchestrator: &mut crate::orchestrator::Orchestrator, + max_steps: usize, + ) -> Result<(), String> { + use std::thread; + use std::time::Duration; + + for _i in 0..max_steps { + thread::sleep(Duration::from_millis(50)); + if orchestrator.running_jobs.is_empty() { + return Ok(()); + } + orchestrator + .step() + .map_err(|e| format!("step failed: {}", e))?; + } + + Err(format!("Jobs did not complete after {} steps", max_steps)) + } + // Use case: a graph with multi-hop deps should achieve the multi-hop build // - Job B depends on part_a produced by job A // - Job B should be attempted, fail, and create a want for part_a // - Job A should be attempted, succeed, and produce part_a // - Job B should be attempted, succeed, and produce part_b #[test] - #[ignore] fn test_multi_hop_want_builds_partition() { - // Given: Set up orchestrator with alpha and beta jobs - // In this scenario: beta depends on alpha - let mut orchestrator = setup_scenario_a_to_b(build_orchestrator()); + use crate::job::JobConfiguration; + use crate::orchestrator::OrchestratorConfig; + use std::fs; + use std::os::unix::fs::PermissionsExt; + + // Clean up marker file from any previous runs + let marker_file = "/tmp/databuild_test_alpha_complete"; + let _ = fs::remove_file(marker_file); + + // Create inline test scripts in /tmp + let alpha_script = "/tmp/test_job_alpha.sh"; + let beta_script = "/tmp/test_job_beta.sh"; + + // Alpha job: creates marker file and outputs success + fs::write( + alpha_script, + r#"#!/bin/bash +touch /tmp/databuild_test_alpha_complete +echo '{"DataDepLogLine":{"Success":{"version":"1","produced_partitions":["data/alpha"]}}}' +"#, + ) + .unwrap(); + + // Beta job: checks for alpha marker, outputs dep miss if not found + fs::write(beta_script, r#"#!/bin/bash +if [ ! -f /tmp/databuild_test_alpha_complete ]; then + echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}' + exit 1 +fi +echo 'Beta succeeded' +"#).unwrap(); + + // Make scripts executable + fs::set_permissions(alpha_script, fs::Permissions::from_mode(0o755)).unwrap(); + fs::set_permissions(beta_script, fs::Permissions::from_mode(0o755)).unwrap(); + + // Given: Set up orchestrator with alpha and beta jobs using test scripts + let mut orchestrator = build_orchestrator(); + orchestrator.config = OrchestratorConfig { + jobs: vec![ + JobConfiguration { + label: "alpha".to_string(), + patterns: vec!["data/alpha".to_string()], + entry_point: alpha_script.to_string(), + }, + JobConfiguration { + label: "beta".to_string(), + patterns: vec!["data/beta".to_string()], + entry_point: beta_script.to_string(), + }, + ], + }; let partition_beta = "data/beta"; let partition_alpha = "data/alpha"; @@ -636,70 +701,101 @@ mod tests { // When: Run orchestrator steps to let it naturally handle the multi-hop build // Step 1: Should schedule beta job (want -> not_started_jobs) orchestrator.step().expect("step 1"); - assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta job should be queued"); + assert_eq!( + orchestrator.not_started_jobs.len(), + 1, + "beta job should be queued" + ); // Step 2: Should start beta job (not_started_jobs -> running_jobs) orchestrator.step().expect("step 2"); - assert_eq!(orchestrator.running_jobs.len(), 1, "beta job should be running"); - - // Step 3: Beta job detects missing alpha dep and creates want - thread::sleep(Duration::from_millis(10)); - orchestrator.step().expect("step 3"); - // (Beta should now be in dep_miss state, and a want for alpha should be created) - assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "beta should have dep miss"); - - // Verify want for alpha was created - let wants = orchestrator.bel.state.wants_schedulability().schedulable_wants(); - assert!( - wants.iter().any(|w| w.partitions.iter().any(|p| p.r#ref == partition_alpha)), - "should create want for alpha partition" + assert_eq!( + orchestrator.running_jobs.len(), + 1, + "beta job should be running" ); - // Step 4: Should schedule alpha job (want -> not_started_jobs) - orchestrator.step().expect("step 4"); - assert_eq!(orchestrator.not_started_jobs.len(), 1, "alpha job should be queued"); + // Step 3: Beta job detects missing alpha dep and creates want + wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); + // (Beta should now be in dep_miss state, and a want for alpha should be created) + assert_eq!( + orchestrator.dep_miss_jobs.len(), + 1, + "beta should have dep miss" + ); - // Step 5: Should start alpha job (not_started_jobs -> running_jobs) - orchestrator.step().expect("step 5"); - assert_eq!(orchestrator.running_jobs.len(), 1, "alpha job should be running"); + // Step 4: Should schedule and start alpha job + // (dep miss handler created the alpha want, which will be picked up by poll_wants) + orchestrator.step().expect("step 4"); + assert_eq!( + orchestrator.running_jobs.len(), + 1, + "alpha job should be running" + ); // Step 6: Alpha completes successfully - thread::sleep(Duration::from_millis(10)); - orchestrator.step().expect("step 6"); - assert_eq!(orchestrator.completed_jobs.len(), 1, "alpha should complete"); + wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete"); assert_eq!( - orchestrator.bel.state.get_partition(partition_alpha).unwrap().status, + orchestrator.completed_jobs.len(), + 1, + "alpha should complete" + ); + assert_eq!( + orchestrator + .bel + .state + .get_partition(partition_alpha) + .unwrap() + .status, Some(PartitionStatusCode::PartitionLive.into()), "alpha partition should be live" ); - // Step 7: Beta is rescheduled (want -> not_started_jobs) + // Step 7: Beta is rescheduled and started (want -> running_jobs) orchestrator.step().expect("step 7"); - assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta should be queued for retry"); - - // Step 8: Beta starts running (not_started_jobs -> running_jobs) - orchestrator.step().expect("step 8"); assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running"); - // Step 9: Beta completes successfully - thread::sleep(Duration::from_millis(10)); - orchestrator.step().expect("step 9"); + // Step 8: Beta completes successfully + wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); // Then: Verify both partitions are live and both jobs completed - assert_eq!(orchestrator.completed_jobs.len(), 2, "both jobs should complete"); - assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "should have one dep miss"); + assert_eq!( + orchestrator.completed_jobs.len(), + 2, + "both jobs should complete" + ); + assert_eq!( + orchestrator.dep_miss_jobs.len(), + 1, + "should have one dep miss" + ); assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail"); assert_eq!( - orchestrator.bel.state.get_partition(partition_alpha).unwrap().status, + orchestrator + .bel + .state + .get_partition(partition_alpha) + .unwrap() + .status, Some(PartitionStatusCode::PartitionLive.into()), "alpha partition should be live" ); assert_eq!( - orchestrator.bel.state.get_partition(partition_beta).unwrap().status, + orchestrator + .bel + .state + .get_partition(partition_beta) + .unwrap() + .status, Some(PartitionStatusCode::PartitionLive.into()), "beta partition should be live after multi-hop build" ); + + // Cleanup + let _ = fs::remove_file(marker_file); + let _ = fs::remove_file(alpha_script); + let _ = fs::remove_file(beta_script); } }