implement multihop orchestrator test
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-11-21 08:12:13 +08:00
parent a43e9fb6ea
commit 8208af6605
7 changed files with 425 additions and 102 deletions

View file

@ -1,12 +1,11 @@
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail};
use crate::util::{current_timestamp, DatabuildError};
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1};
use prost::Message;
use rusqlite::Connection;
use std::error::Error;
use std::fmt::Debug;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::build_state::BuildState;
use crate::util::{current_timestamp, DatabuildError};
pub trait BELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
@ -237,11 +236,11 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
mod tests {
mod sqlite_bel_storage {
use uuid::Uuid;
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::BuildState;
use uuid::Uuid;
#[test]
fn test_sqlite_append_event() {

View file

@ -1,12 +1,16 @@
use crate::data_build_event::Event;
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::util::{current_timestamp, DatabuildError};
use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::FromSql;
use rusqlite::ToSql;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::util::{DatabuildError, current_timestamp};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail,
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error;
/**
Design Notes
@ -75,22 +79,31 @@ impl BuildState {
}
}
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<Vec<Event>, DatabuildError> {
fn handle_want_create(
&mut self,
event: &WantCreateEventV1,
) -> Result<Vec<Event>, DatabuildError> {
self.wants
.insert(event.want_id.clone(), event.clone().into());
Ok(vec!())
Ok(vec![])
}
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
fn handle_want_cancel(
&mut self,
event: &WantCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
}
Ok(vec!())
Ok(vec![])
}
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<Vec<Event>, DatabuildError> {
fn handle_job_run_buffer(
&mut self,
event: &JobRunBufferEventV1,
) -> Result<Vec<Event>, DatabuildError> {
// No job run should exist
if self.job_runs.get(&event.job_run_id).is_some() {
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
@ -98,18 +111,18 @@ impl BuildState {
// Create job run to be inserted
let job_run: JobRunDetail = event.clone().into();
for pref in job_run.building_partitions.iter() {
// Update all wants that point to this partition ref to `Building`
// Query notes: "update all wants that point to this partition to building"
if let Some(want) = self.wants.get_mut(&pref.r#ref) {
// Mark all servicing wants as WantBuilding
for wap in &job_run.servicing_wants {
if let Some(want) = self.wants.get_mut(&wap.want_id) {
want.status = Some(WantStatusCode::WantBuilding.into());
want.last_updated_timestamp = current_timestamp();
}
}
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(vec!())
Ok(vec![])
}
}
@ -196,7 +209,7 @@ impl BuildState {
event: &JobRunHeartbeatEventV1,
) -> Result<Vec<Event>, DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?;
Ok(vec!())
Ok(vec![])
}
fn handle_job_run_success(
@ -206,15 +219,58 @@ impl BuildState {
println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Clone building_partitions before we use it multiple times
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// Update partitions being build by this job
for pref in job_run.building_partitions {
for pref in &newly_live_partitions {
self.update_partition_status(
&pref,
pref,
PartitionStatusCode::PartitionLive,
Some(&event.job_run_id),
)?;
}
Ok(vec!())
// Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied
let wants_to_update: Vec<String> = self
.wants
.iter()
.filter(|(_, want)| {
want.status.as_ref().map(|s| s.code)
== Some(WantStatusCode::WantUpstreamBuilding as i32)
})
.filter(|(_, want)| {
// Check if this want was waiting for any of the newly live partitions
want.upstreams.iter().any(|upstream| {
newly_live_partitions
.iter()
.any(|p| p.r#ref == upstream.r#ref)
})
})
.map(|(want_id, _)| want_id.clone())
.collect();
for want_id in wants_to_update {
if let Some(want) = self.wants.get_mut(&want_id) {
// Check if all upstreams are now satisfied
let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| {
self.partitions
.get(&upstream.r#ref)
.and_then(|p| p.status.as_ref())
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
.unwrap_or(false)
});
if all_upstreams_satisfied {
// Transition back to WantIdle so it can be rescheduled
want.status = Some(WantStatusCode::WantIdle.into());
want.last_updated_timestamp = current_timestamp();
}
}
}
Ok(vec![])
}
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
@ -236,17 +292,50 @@ impl BuildState {
) -> Result<Vec<Event>, DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
for pref in job_run.building_partitions {
// Clone building_partitions before we use it multiple times
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
for pref in &failed_partitions {
self.update_partition_status(
&pref,
pref,
PartitionStatusCode::PartitionFailed,
Some(&event.job_run_id),
)?;
}
Ok(vec!())
// Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions
let wants_to_fail: Vec<String> = self
.wants
.iter()
.filter(|(_, want)| {
want.status.as_ref().map(|s| s.code)
== Some(WantStatusCode::WantUpstreamBuilding as i32)
})
.filter(|(_, want)| {
// Check if this want was waiting for any of the failed partitions
want.upstreams
.iter()
.any(|upstream| failed_partitions.iter().any(|p| p.r#ref == upstream.r#ref))
})
.map(|(want_id, _)| want_id.clone())
.collect();
for want_id in wants_to_fail {
if let Some(want) = self.wants.get_mut(&want_id) {
// Transition to WantUpstreamFailed since a dependency failed
want.status = Some(WantStatusCode::WantUpstreamFailed.into());
want.last_updated_timestamp = current_timestamp();
}
}
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
Ok(vec![])
}
fn handle_job_run_cancel(
&mut self,
event: &JobRunCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> {
todo!("should update already inserted job run, partition status, want status")
}
@ -254,9 +343,7 @@ impl BuildState {
&mut self,
event: &JobRunMissingDepsEventV1,
) -> Result<Vec<Event>, DatabuildError> {
let job_run_detail = self
.get_job_run(&event.job_run_id)
.ok_or(format!(
let job_run_detail = self.get_job_run(&event.job_run_id).ok_or(format!(
"Unable to find job run with id `{}`",
event.job_run_id
))?;
@ -267,6 +354,40 @@ impl BuildState {
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Update servicing wants to track missing dependencies as upstreams
for servicing_want in &job_run_detail.servicing_wants {
if let Some(want) = self.wants.get_mut(&servicing_want.want_id) {
let mut want_is_impacted = false;
for missing_dep in &event.missing_deps {
// Only update this want if it contains an impacted partition
let impacted = missing_dep
.impacted
.iter()
.any(|impacted| want.partitions.iter().any(|p| p.r#ref == impacted.r#ref));
if impacted {
want_is_impacted = true;
// Add missing partitions to upstreams
for missing_partition in &missing_dep.missing {
want.upstreams.push(missing_partition.clone());
}
}
}
// Dedupe upstreams
let mut seen = std::collections::HashSet::new();
want.upstreams.retain(|p| seen.insert(p.r#ref.clone()));
// Set impacted wants to WantUpstreamBuilding so they won't be rescheduled
// until their dependencies are ready
if want_is_impacted {
want.status = Some(WantStatusCode::WantUpstreamBuilding.into());
want.last_updated_timestamp = current_timestamp();
}
}
}
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
event.missing_deps.clone(),
@ -277,11 +398,17 @@ impl BuildState {
Ok(want_events)
}
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<Vec<Event>, DatabuildError> {
fn handle_taint_create(
&mut self,
event: &TaintCreateEventV1,
) -> Result<Vec<Event>, DatabuildError> {
todo!("...?")
}
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
fn handle_taint_delete(
&mut self,
event: &TaintCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> {
todo!("...?")
}
@ -389,14 +516,12 @@ impl BuildState {
WantsSchedulability(
self.wants
.values()
// Do not consider fulfilled or currently building wants in schedulability query
// Only consider idle wants for schedulability - all other states are either
// terminal (successful/failed/canceled/upstream failed) or waiting for an event
// (building/upstream building)
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantSuccessful as i32
})
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantBuilding as i32
== WantStatusCode::WantIdle as i32
})
.cloned()
.map(|w| self.want_schedulability(&w))
@ -531,6 +656,105 @@ mod tests {
assert!(!ws.is_schedulable());
}
#[test]
fn test_want_not_schedulable_after_dep_miss_until_deps_exist() {
use crate::{
JobRunDetail, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions,
};
use std::collections::BTreeMap;
// Given: A want with a job run that had a dep miss
let beta_partition = "data/beta";
let alpha_partition = "data/alpha";
let want_id = "beta_want";
let job_run_id = "job_123";
let mut state = BuildState::default().with_wants(BTreeMap::from([(
want_id.to_string(),
WantDetail::default()
.with_partitions(vec![beta_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));
// Job run exists for this want
state.job_runs.insert(
job_run_id.to_string(),
JobRunDetail {
id: job_run_id.to_string(),
servicing_wants: vec![WantAttributedPartitions {
want_id: want_id.to_string(),
partitions: vec![beta_partition.into()],
}],
..Default::default()
},
);
// Initially, want should be schedulable (no known upstreams)
let schedulability = state.wants_schedulability();
assert_eq!(schedulability.0.len(), 1);
assert!(
schedulability.0[0].is_schedulable(),
"want should be schedulable before dep miss"
);
// When: Job run fails with dep miss indicating it needs alpha
let dep_miss_event = JobRunMissingDepsEventV1 {
job_run_id: job_run_id.to_string(),
missing_deps: vec![MissingDeps {
impacted: vec![beta_partition.into()],
missing: vec![alpha_partition.into()],
}],
read_deps: vec![],
};
let new_events = state.handle_job_run_dep_miss(&dep_miss_event).unwrap();
for event in new_events {
state.handle_event(&event).unwrap();
}
// Then: Beta want should be in WantUpstreamBuilding status and not in schedulability list
let schedulability = state.wants_schedulability();
// The schedulability list should contain the newly created alpha want, but not the beta want
let has_beta_want = schedulability
.0
.iter()
.any(|ws| ws.want.partitions.iter().any(|p| p.r#ref == beta_partition));
assert!(
!has_beta_want,
"beta want should not appear in schedulability list when in WantUpstreamBuilding status"
);
// The alpha want should be schedulable
let has_alpha_want = schedulability.0.iter().any(|ws| {
ws.want
.partitions
.iter()
.any(|p| p.r#ref == alpha_partition)
});
assert!(
has_alpha_want,
"alpha want should be schedulable (newly created from dep miss)"
);
// Verify the beta want is now in WantUpstreamBuilding status
let beta_want = state.wants.get(want_id).expect("beta want should exist");
assert_eq!(
beta_want.status.as_ref().unwrap().code,
WantStatusCode::WantUpstreamBuilding as i32,
"want should be in WantUpstreamBuilding status after dep miss"
);
assert_eq!(
beta_want.upstreams.len(),
1,
"want should have one upstream"
);
assert_eq!(
beta_want.upstreams[0].r#ref, alpha_partition,
"upstream should be alpha"
);
}
#[test]
#[ignore]
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
@ -569,7 +793,9 @@ mod tests {
e.partitions = vec!["mypart".into()];
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
state
.handle_event(&e.clone().into())
.expect("want create failed");
let want = state.get_want("1234").unwrap();
let mut expected: WantDetail = e.into();
// Into will set this field as current timestamp
@ -584,12 +810,16 @@ mod tests {
e.partitions = vec!["mypart".into()];
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
state
.handle_event(&e.clone().into())
.expect("want create failed");
// Should be able to cancel
let mut e = WantCancelEventV1::default();
e.want_id = "1234".to_string();
state.handle_event(&e.clone().into());
state
.handle_event(&e.clone().into())
.expect("want cancel failed");
let want = state.get_want("1234").unwrap();
assert_eq!(

View file

@ -181,6 +181,8 @@ enum WantStatusCode {
WantFailed = 2;
WantSuccessful = 3;
WantCanceled = 4;
WantUpstreamBuilding = 5;
WantUpstreamFailed = 6;
}
message WantDetail {

View file

@ -1,2 +0,0 @@
use uuid::Uuid;
use crate::{PartitionRef, WantCreateEventV1};

View file

@ -1,12 +1,11 @@
use crate::data_build_event::Event;
use crate::data_deps::JobRunDataDepResults;
use crate::util::DatabuildError;
use crate::{
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
JobRunSuccessEventV1, MissingDeps, ReadDeps,
};
use crate::util::DatabuildError;
use std::collections::HashMap;
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::marker::PhantomData;
use std::process::{Child, Command, Stdio};

View file

@ -5,7 +5,6 @@ mod job;
mod util;
mod build_state;
mod event_transforms;
mod event_defaults;
mod data_deps;
mod mock_job_run;

View file

@ -1,12 +1,13 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend};
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun,
RunningJobRun, SubProcessBackend,
};
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
/**
@ -176,7 +177,8 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
JobRunVisitResult::Completed(completed) => {
// Emit success event
println!("Completed job: {:?}", completed.id());
self.bel.append_event(&completed.state.to_event(&completed.id()))?;
self.bel
.append_event(&completed.state.to_event(&completed.id()))?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
@ -447,12 +449,6 @@ mod tests {
/// Orchestrator want creation is the means of data dependency propagation, allowing the
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
mod want_create {
use crate::data_build_event::Event;
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{JobRunBufferEventV1, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1};
use std::marker::PhantomData;
use uuid::Uuid;
// /// Use case: The orchestrator should map a failed job into a set of wants
// #[test]
@ -609,17 +605,86 @@ mod tests {
);
}
/// Helper to wait for running jobs to complete with timeout
fn wait_for_jobs_to_complete<S: crate::build_event_log::BELStorage + std::fmt::Debug>(
orchestrator: &mut crate::orchestrator::Orchestrator<S>,
max_steps: usize,
) -> Result<(), String> {
use std::thread;
use std::time::Duration;
for _i in 0..max_steps {
thread::sleep(Duration::from_millis(50));
if orchestrator.running_jobs.is_empty() {
return Ok(());
}
orchestrator
.step()
.map_err(|e| format!("step failed: {}", e))?;
}
Err(format!("Jobs did not complete after {} steps", max_steps))
}
// Use case: a graph with multi-hop deps should achieve the multi-hop build
// - Job B depends on part_a produced by job A
// - Job B should be attempted, fail, and create a want for part_a
// - Job A should be attempted, succeed, and produce part_a
// - Job B should be attempted, succeed, and produce part_b
#[test]
#[ignore]
fn test_multi_hop_want_builds_partition() {
// Given: Set up orchestrator with alpha and beta jobs
// In this scenario: beta depends on alpha
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
use crate::job::JobConfiguration;
use crate::orchestrator::OrchestratorConfig;
use std::fs;
use std::os::unix::fs::PermissionsExt;
// Clean up marker file from any previous runs
let marker_file = "/tmp/databuild_test_alpha_complete";
let _ = fs::remove_file(marker_file);
// Create inline test scripts in /tmp
let alpha_script = "/tmp/test_job_alpha.sh";
let beta_script = "/tmp/test_job_beta.sh";
// Alpha job: creates marker file and outputs success
fs::write(
alpha_script,
r#"#!/bin/bash
touch /tmp/databuild_test_alpha_complete
echo '{"DataDepLogLine":{"Success":{"version":"1","produced_partitions":["data/alpha"]}}}'
"#,
)
.unwrap();
// Beta job: checks for alpha marker, outputs dep miss if not found
fs::write(beta_script, r#"#!/bin/bash
if [ ! -f /tmp/databuild_test_alpha_complete ]; then
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
exit 1
fi
echo 'Beta succeeded'
"#).unwrap();
// Make scripts executable
fs::set_permissions(alpha_script, fs::Permissions::from_mode(0o755)).unwrap();
fs::set_permissions(beta_script, fs::Permissions::from_mode(0o755)).unwrap();
// Given: Set up orchestrator with alpha and beta jobs using test scripts
let mut orchestrator = build_orchestrator();
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: alpha_script.to_string(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: beta_script.to_string(),
},
],
};
let partition_beta = "data/beta";
let partition_alpha = "data/alpha";
@ -636,70 +701,101 @@ mod tests {
// When: Run orchestrator steps to let it naturally handle the multi-hop build
// Step 1: Should schedule beta job (want -> not_started_jobs)
orchestrator.step().expect("step 1");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta job should be queued");
assert_eq!(
orchestrator.not_started_jobs.len(),
1,
"beta job should be queued"
);
// Step 2: Should start beta job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 2");
assert_eq!(orchestrator.running_jobs.len(), 1, "beta job should be running");
// Step 3: Beta job detects missing alpha dep and creates want
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 3");
// (Beta should now be in dep_miss state, and a want for alpha should be created)
assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "beta should have dep miss");
// Verify want for alpha was created
let wants = orchestrator.bel.state.wants_schedulability().schedulable_wants();
assert!(
wants.iter().any(|w| w.partitions.iter().any(|p| p.r#ref == partition_alpha)),
"should create want for alpha partition"
assert_eq!(
orchestrator.running_jobs.len(),
1,
"beta job should be running"
);
// Step 4: Should schedule alpha job (want -> not_started_jobs)
orchestrator.step().expect("step 4");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "alpha job should be queued");
// Step 3: Beta job detects missing alpha dep and creates want
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// (Beta should now be in dep_miss state, and a want for alpha should be created)
assert_eq!(
orchestrator.dep_miss_jobs.len(),
1,
"beta should have dep miss"
);
// Step 5: Should start alpha job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 5");
assert_eq!(orchestrator.running_jobs.len(), 1, "alpha job should be running");
// Step 4: Should schedule and start alpha job
// (dep miss handler created the alpha want, which will be picked up by poll_wants)
orchestrator.step().expect("step 4");
assert_eq!(
orchestrator.running_jobs.len(),
1,
"alpha job should be running"
);
// Step 6: Alpha completes successfully
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 6");
assert_eq!(orchestrator.completed_jobs.len(), 1, "alpha should complete");
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete");
assert_eq!(
orchestrator.bel.state.get_partition(partition_alpha).unwrap().status,
orchestrator.completed_jobs.len(),
1,
"alpha should complete"
);
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
// Step 7: Beta is rescheduled (want -> not_started_jobs)
// Step 7: Beta is rescheduled and started (want -> running_jobs)
orchestrator.step().expect("step 7");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta should be queued for retry");
// Step 8: Beta starts running (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 8");
assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running");
// Step 9: Beta completes successfully
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 9");
// Step 8: Beta completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// Then: Verify both partitions are live and both jobs completed
assert_eq!(orchestrator.completed_jobs.len(), 2, "both jobs should complete");
assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "should have one dep miss");
assert_eq!(
orchestrator.completed_jobs.len(),
2,
"both jobs should complete"
);
assert_eq!(
orchestrator.dep_miss_jobs.len(),
1,
"should have one dep miss"
);
assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail");
assert_eq!(
orchestrator.bel.state.get_partition(partition_alpha).unwrap().status,
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
assert_eq!(
orchestrator.bel.state.get_partition(partition_beta).unwrap().status,
orchestrator
.bel
.state
.get_partition(partition_beta)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"beta partition should be live after multi-hop build"
);
// Cleanup
let _ = fs::remove_file(marker_file);
let _ = fs::remove_file(alpha_script);
let _ = fs::remove_file(beta_script);
}
}