part way through multihop test

This commit is contained in:
Stuart Axelbrooke 2025-11-20 02:12:21 -08:00
parent eadd23eb63
commit a43e9fb6ea
5 changed files with 231 additions and 147 deletions

View file

@ -165,8 +165,12 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.state.handle_event(&event)?;
let events = self.state.handle_event(&event)?;
let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates
for event in events {
self.append_event(&event)?;
}
Ok(idx)
}

View file

@ -1,6 +1,5 @@
use crate::data_build_event::Event;
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::job_run::{DepMissJobRun, SubProcessBackend};
use crate::util::{current_timestamp, DatabuildError};
use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::FromSql;
@ -55,7 +54,8 @@ impl BuildState {
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
pub fn handle_event(&mut self, event: &Event) -> Result<(), DatabuildError> {
/// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> {
match event {
// JobRun events
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
@ -75,22 +75,22 @@ impl BuildState {
}
}
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<(), DatabuildError> {
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<Vec<Event>, DatabuildError> {
self.wants
.insert(event.want_id.clone(), event.clone().into());
Ok(())
Ok(vec!())
}
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
}
Ok(())
Ok(vec!())
}
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<(), DatabuildError> {
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<Vec<Event>, DatabuildError> {
// No job run should exist
if self.job_runs.get(&event.job_run_id).is_some() {
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
@ -109,7 +109,7 @@ impl BuildState {
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(())
Ok(vec!())
}
}
@ -194,14 +194,15 @@ impl BuildState {
fn handle_job_run_heartbeat(
&mut self,
event: &JobRunHeartbeatEventV1,
) -> Result<(), DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)
) -> Result<Vec<Event>, DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?;
Ok(vec!())
}
fn handle_job_run_success(
&mut self,
event: &JobRunSuccessEventV1,
) -> Result<(), DatabuildError> {
) -> Result<Vec<Event>, DatabuildError> {
println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
@ -213,7 +214,7 @@ impl BuildState {
Some(&event.job_run_id),
)?;
}
Ok(())
Ok(vec!())
}
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
@ -232,7 +233,7 @@ impl BuildState {
fn handle_job_run_failure(
&mut self,
event: &JobRunFailureEventV1,
) -> Result<(), DatabuildError> {
) -> Result<Vec<Event>, DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
for pref in job_run.building_partitions {
@ -242,25 +243,45 @@ impl BuildState {
Some(&event.job_run_id),
)?;
}
Ok(())
Ok(vec!())
}
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<(), DatabuildError> {
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
todo!("should update already inserted job run, partition status, want status")
}
fn handle_job_run_dep_miss(
pub fn handle_job_run_dep_miss(
&mut self,
event: &JobRunMissingDepsEventV1,
) -> Result<(), DatabuildError> {
todo!("should update already inserted job run, schedule wants...?")
) -> Result<Vec<Event>, DatabuildError> {
let job_run_detail = self
.get_job_run(&event.job_run_id)
.ok_or(format!(
"Unable to find job run with id `{}`",
event.job_run_id
))?;
// Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
event.missing_deps.clone(),
&event.job_run_id,
want_timestamps,
);
Ok(want_events)
}
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<(), DatabuildError> {
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<Vec<Event>, DatabuildError> {
todo!("...?")
}
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> {
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<Vec<Event>, DatabuildError> {
todo!("...?")
}
@ -382,39 +403,6 @@ impl BuildState {
.collect(),
)
}
/// Maps a dep miss into the BEL events it implies, so that the job can be run successfully later
pub fn dep_miss_to_events(
&self,
dep_miss: &DepMissJobRun<SubProcessBackend>,
) -> Result<Vec<Event>, DatabuildError> {
let mut events = vec![];
// Append literal job run dep miss
events.push(dep_miss.state.to_event(&dep_miss.id()));
// Append wants from dep miss
let job_run_detail = self
.get_job_run(&dep_miss.job_run_id.to_string())
.ok_or(format!(
"Unable to find job run with id `{}`",
dep_miss.job_run_id
))?;
// Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
dep_miss.state.missing_deps.clone(),
&dep_miss.job_run_id,
want_timestamps,
);
events.extend(want_events);
Ok(events)
}
}
/// The status of partitions required by a want to build (sensed from dep miss job run)

View file

@ -111,7 +111,7 @@ impl WantTimestamps {
pub fn missing_deps_to_want_events(
missing_deps: Vec<MissingDeps>,
job_run_id: &Uuid,
job_run_id: &String,
want_timestamps: WantTimestamps,
) -> Vec<Event> {
missing_deps
@ -125,7 +125,7 @@ pub fn missing_deps_to_want_events(
sla_seconds: want_timestamps.sla_seconds,
source: Some(
JobTriggeredEvent {
job_run_id: job_run_id.to_string(),
job_run_id: job_run_id.clone(),
}
.into(),
),

View file

@ -1,4 +1,6 @@
use std::collections::HashMap;
use crate::data_deps::DataDepLogLine;
use crate::{JobRunMissingDeps, MissingDeps};
pub struct MockJobRun {
sleep_ms: u64,
@ -51,6 +53,16 @@ impl MockJobRun {
self
}
pub fn dep_miss(self, missing_deps: Vec<MissingDeps>) -> Self {
self.exit_code(1)
.stdout_msg(
&DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".to_string(),
missing_deps,
}).into()
)
}
pub fn to_env(&self) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert(

View file

@ -188,9 +188,8 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
JobRunVisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
self.bel.append_event(&event)?;
}
let event = dep_miss.state.to_event(&dep_miss.id());
self.bel.append_event(&event)?;
self.dep_miss_jobs.push(dep_miss);
}
}
@ -451,95 +450,94 @@ mod tests {
use crate::data_build_event::Event;
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{
JobRunBufferEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1,
};
use crate::{JobRunBufferEventV1, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1};
use std::marker::PhantomData;
use uuid::Uuid;
/// Use case: The orchestrator should map a failed job into a set of wants
#[test]
fn test_job_fail_want_mapping() {
// Given a
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// Add event for originating want
let want_create = WantCreateEventV1::sample();
let building_partitions = vec!["data/beta".into()];
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: building_partitions.clone(),
..want_create.clone()
}))
.expect("event append");
// Create failed job run detail
let want_attributed_partitions: Vec<WantAttributedPartitions> =
vec![want_create.clone().into()];
let job_run_id = Uuid::new_v4();
let job_run = JobRunBufferEventV1 {
job_run_id: job_run_id.into(),
building_partitions: building_partitions.clone(),
want_attributed_partitions: want_attributed_partitions.clone(),
..JobRunBufferEventV1::default()
};
orchestrator
.bel
.append_event(&Event::JobRunBufferV1(job_run))
.expect("event append");
// Job runs should not be empty
orchestrator
.bel
.state
.get_job_run(&job_run_id.to_string())
.expect("job run should exist");
// Add event for job failure
let dep_miss_job_run = DepMissJobRun {
job_run_id,
state: SubProcessDepMiss {
stdout_buffer: vec![],
missing_deps: vec![MissingDeps {
impacted: vec!["data/beta".into()],
missing: vec!["data/alpha".into()],
}],
read_deps: vec![],
},
_backend: PhantomData,
};
// When calculating events from dep miss
// TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
let events = orchestrator
.bel
.state
.dep_miss_to_events(&dep_miss_job_run)
.unwrap();
// Should have scheduled a job for alpha
assert_eq!(
events
.iter()
.filter(|e| match e {
Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
_ => false,
})
.count(),
1
);
assert!(
orchestrator.not_started_jobs.is_empty(),
"shouldn't have scheduled yet"
);
// Should schedule job after we poll wants
orchestrator.poll_wants().expect("poll wants");
assert_eq!(
orchestrator.not_started_jobs.len(),
1,
"should have scheduled job"
);
}
// /// Use case: The orchestrator should map a failed job into a set of wants
// #[test]
// fn test_job_fail_want_mapping() {
// // Given a
// let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// // Add event for originating want
// let want_create = WantCreateEventV1::sample();
// let building_partitions = vec!["data/beta".into()];
// orchestrator
// .bel
// .append_event(&Event::WantCreateV1(WantCreateEventV1 {
// partitions: building_partitions.clone(),
// ..want_create.clone()
// }))
// .expect("event append");
// // Create failed job run detail
// let want_attributed_partitions: Vec<WantAttributedPartitions> =
// vec![want_create.clone().into()];
// let job_run_id = Uuid::new_v4();
// let job_run = JobRunBufferEventV1 {
// job_run_id: job_run_id.into(),
// building_partitions: building_partitions.clone(),
// want_attributed_partitions: want_attributed_partitions.clone(),
// ..JobRunBufferEventV1::default()
// };
// orchestrator
// .bel
// .append_event(&Event::JobRunBufferV1(job_run))
// .expect("event append");
//
// // Job runs should not be empty
// orchestrator
// .bel
// .state
// .get_job_run(&job_run_id.to_string())
// .expect("job run should exist");
//
// // Add event for job failure
// let dep_miss_job_run = DepMissJobRun {
// job_run_id,
// state: SubProcessDepMiss {
// stdout_buffer: vec![],
// missing_deps: vec![MissingDeps {
// impacted: vec!["data/beta".into()],
// missing: vec!["data/alpha".into()],
// }],
// read_deps: vec![],
// },
// _backend: PhantomData,
// };
//
// // When calculating events from dep miss
// // TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
// let dep_miss_event = dep_miss_job_run.state.to_event(&dep_miss_job_run.id());;
// let events = orchestrator
// .bel
// .state
// .handle_job_run_dep_miss(&dep_miss_event)
// .unwrap();
//
// // Should have scheduled a job for alpha
// assert_eq!(
// events
// .iter()
// .filter(|e| match e {
// Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
// _ => false,
// })
// .count(),
// 1
// );
// assert!(
// orchestrator.not_started_jobs.is_empty(),
// "shouldn't have scheduled yet"
// );
//
// // Should schedule job after we poll wants
// orchestrator.poll_wants().expect("poll wants");
// assert_eq!(
// orchestrator.not_started_jobs.len(),
// 1,
// "should have scheduled job"
// );
// }
}
/// Orchestrator needs to be able to achieve high level orchestration use cases.
@ -619,7 +617,89 @@ mod tests {
#[test]
#[ignore]
fn test_multi_hop_want_builds_partition() {
todo!()
// Given: Set up orchestrator with alpha and beta jobs
// In this scenario: beta depends on alpha
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
let partition_beta = "data/beta";
let partition_alpha = "data/alpha";
// Create initial want for beta partition
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: vec![partition_beta.into()],
..WantCreateEventV1::sample()
}))
.expect("event append");
// When: Run orchestrator steps to let it naturally handle the multi-hop build
// Step 1: Should schedule beta job (want -> not_started_jobs)
orchestrator.step().expect("step 1");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta job should be queued");
// Step 2: Should start beta job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 2");
assert_eq!(orchestrator.running_jobs.len(), 1, "beta job should be running");
// Step 3: Beta job detects missing alpha dep and creates want
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 3");
// (Beta should now be in dep_miss state, and a want for alpha should be created)
assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "beta should have dep miss");
// Verify want for alpha was created
let wants = orchestrator.bel.state.wants_schedulability().schedulable_wants();
assert!(
wants.iter().any(|w| w.partitions.iter().any(|p| p.r#ref == partition_alpha)),
"should create want for alpha partition"
);
// Step 4: Should schedule alpha job (want -> not_started_jobs)
orchestrator.step().expect("step 4");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "alpha job should be queued");
// Step 5: Should start alpha job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 5");
assert_eq!(orchestrator.running_jobs.len(), 1, "alpha job should be running");
// Step 6: Alpha completes successfully
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 6");
assert_eq!(orchestrator.completed_jobs.len(), 1, "alpha should complete");
assert_eq!(
orchestrator.bel.state.get_partition(partition_alpha).unwrap().status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
// Step 7: Beta is rescheduled (want -> not_started_jobs)
orchestrator.step().expect("step 7");
assert_eq!(orchestrator.not_started_jobs.len(), 1, "beta should be queued for retry");
// Step 8: Beta starts running (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 8");
assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running");
// Step 9: Beta completes successfully
thread::sleep(Duration::from_millis(10));
orchestrator.step().expect("step 9");
// Then: Verify both partitions are live and both jobs completed
assert_eq!(orchestrator.completed_jobs.len(), 2, "both jobs should complete");
assert_eq!(orchestrator.dep_miss_jobs.len(), 1, "should have one dep miss");
assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail");
assert_eq!(
orchestrator.bel.state.get_partition(partition_alpha).unwrap().status,
Some(PartitionStatusCode::PartitionLive.into()),
"alpha partition should be live"
);
assert_eq!(
orchestrator.bel.state.get_partition(partition_beta).unwrap().status,
Some(PartitionStatusCode::PartitionLive.into()),
"beta partition should be live after multi-hop build"
);
}
}