Compare commits
No commits in common. "8208af66059308edd8d4134d2ebc6b2dc352042b" and "eadd23eb63f0bab104c5aa56a2819400443738a7" have entirely different histories.
8208af6605
...
eadd23eb63
9 changed files with 191 additions and 598 deletions
|
|
@ -1,11 +1,12 @@
|
||||||
use crate::build_state::BuildState;
|
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::util::{current_timestamp, DatabuildError};
|
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail};
|
||||||
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1};
|
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
|
use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use crate::build_state::BuildState;
|
||||||
|
use crate::util::{current_timestamp, DatabuildError};
|
||||||
|
|
||||||
pub trait BELStorage {
|
pub trait BELStorage {
|
||||||
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
|
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
|
||||||
|
|
@ -164,12 +165,8 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
let events = self.state.handle_event(&event)?;
|
self.state.handle_event(&event)?;
|
||||||
let idx = self.storage.append_event(event)?;
|
let idx = self.storage.append_event(event)?;
|
||||||
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
|
||||||
for event in events {
|
|
||||||
self.append_event(&event)?;
|
|
||||||
}
|
|
||||||
Ok(idx)
|
Ok(idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -208,7 +205,7 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
|
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
|
||||||
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
|
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
|
||||||
todo!();
|
todo!();
|
||||||
let ev: TaintCreateEventV1 = req.into();
|
let ev: TaintCreateEventV1 = req.into();
|
||||||
self.append_event(&ev.clone().into())?;
|
self.append_event(&ev.clone().into())?;
|
||||||
|
|
@ -236,11 +233,11 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
mod sqlite_bel_storage {
|
mod sqlite_bel_storage {
|
||||||
|
use uuid::Uuid;
|
||||||
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
|
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
|
||||||
use crate::build_state::BuildState;
|
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{PartitionRef, WantCreateEventV1};
|
use crate::{PartitionRef, WantCreateEventV1};
|
||||||
use uuid::Uuid;
|
use crate::build_state::BuildState;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sqlite_append_event() {
|
fn test_sqlite_append_event() {
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,13 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
|
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
|
||||||
use crate::util::{DatabuildError, current_timestamp};
|
use crate::job_run::{DepMissJobRun, SubProcessBackend};
|
||||||
use crate::{
|
use crate::util::{current_timestamp, DatabuildError};
|
||||||
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
|
use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
|
||||||
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
|
use rusqlite::types::FromSql;
|
||||||
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
|
use rusqlite::ToSql;
|
||||||
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
|
|
||||||
PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail,
|
|
||||||
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode,
|
|
||||||
};
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Design Notes
|
Design Notes
|
||||||
|
|
@ -58,8 +55,7 @@ impl BuildState {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
||||||
/// Event handlers can return vecs of events that will then be appended to the BEL
|
pub fn handle_event(&mut self, event: &Event) -> Result<(), DatabuildError> {
|
||||||
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
match event {
|
match event {
|
||||||
// JobRun events
|
// JobRun events
|
||||||
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
|
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
|
||||||
|
|
@ -79,31 +75,22 @@ impl BuildState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_want_create(
|
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &WantCreateEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
self.wants
|
self.wants
|
||||||
.insert(event.want_id.clone(), event.clone().into());
|
.insert(event.want_id.clone(), event.clone().into());
|
||||||
Ok(vec![])
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_want_cancel(
|
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &WantCancelEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
|
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
|
||||||
if let Some(want) = self.wants.get_mut(&event.want_id) {
|
if let Some(want) = self.wants.get_mut(&event.want_id) {
|
||||||
want.status = Some(WantStatusCode::WantCanceled.into());
|
want.status = Some(WantStatusCode::WantCanceled.into());
|
||||||
want.last_updated_timestamp = current_timestamp();
|
want.last_updated_timestamp = current_timestamp();
|
||||||
}
|
}
|
||||||
Ok(vec![])
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_buffer(
|
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &JobRunBufferEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
// No job run should exist
|
// No job run should exist
|
||||||
if self.job_runs.get(&event.job_run_id).is_some() {
|
if self.job_runs.get(&event.job_run_id).is_some() {
|
||||||
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
|
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
|
||||||
|
|
@ -111,18 +98,18 @@ impl BuildState {
|
||||||
// Create job run to be inserted
|
// Create job run to be inserted
|
||||||
let job_run: JobRunDetail = event.clone().into();
|
let job_run: JobRunDetail = event.clone().into();
|
||||||
|
|
||||||
// Mark all servicing wants as WantBuilding
|
for pref in job_run.building_partitions.iter() {
|
||||||
for wap in &job_run.servicing_wants {
|
// Update all wants that point to this partition ref to `Building`
|
||||||
if let Some(want) = self.wants.get_mut(&wap.want_id) {
|
// Query notes: "update all wants that point to this partition to building"
|
||||||
|
if let Some(want) = self.wants.get_mut(&pref.r#ref) {
|
||||||
want.status = Some(WantStatusCode::WantBuilding.into());
|
want.status = Some(WantStatusCode::WantBuilding.into());
|
||||||
want.last_updated_timestamp = current_timestamp();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.insert(event.job_run_id.clone(), job_run.clone());
|
.insert(event.job_run_id.clone(), job_run.clone());
|
||||||
println!("Inserted job run: {:?}", job_run);
|
println!("Inserted job run: {:?}", job_run);
|
||||||
Ok(vec![])
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -207,70 +194,26 @@ impl BuildState {
|
||||||
fn handle_job_run_heartbeat(
|
fn handle_job_run_heartbeat(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &JobRunHeartbeatEventV1,
|
event: &JobRunHeartbeatEventV1,
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
) -> Result<(), DatabuildError> {
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?;
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)
|
||||||
Ok(vec![])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_success(
|
fn handle_job_run_success(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &JobRunSuccessEventV1,
|
event: &JobRunSuccessEventV1,
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
) -> Result<(), DatabuildError> {
|
||||||
println!("Job run success event: {:?}", event);
|
println!("Job run success event: {:?}", event);
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
|
||||||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||||
|
|
||||||
// Clone building_partitions before we use it multiple times
|
|
||||||
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
|
|
||||||
|
|
||||||
// Update partitions being build by this job
|
// Update partitions being build by this job
|
||||||
for pref in &newly_live_partitions {
|
for pref in job_run.building_partitions {
|
||||||
self.update_partition_status(
|
self.update_partition_status(
|
||||||
pref,
|
&pref,
|
||||||
PartitionStatusCode::PartitionLive,
|
PartitionStatusCode::PartitionLive,
|
||||||
Some(&event.job_run_id),
|
Some(&event.job_run_id),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
// Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied
|
|
||||||
let wants_to_update: Vec<String> = self
|
|
||||||
.wants
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, want)| {
|
|
||||||
want.status.as_ref().map(|s| s.code)
|
|
||||||
== Some(WantStatusCode::WantUpstreamBuilding as i32)
|
|
||||||
})
|
|
||||||
.filter(|(_, want)| {
|
|
||||||
// Check if this want was waiting for any of the newly live partitions
|
|
||||||
want.upstreams.iter().any(|upstream| {
|
|
||||||
newly_live_partitions
|
|
||||||
.iter()
|
|
||||||
.any(|p| p.r#ref == upstream.r#ref)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.map(|(want_id, _)| want_id.clone())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for want_id in wants_to_update {
|
|
||||||
if let Some(want) = self.wants.get_mut(&want_id) {
|
|
||||||
// Check if all upstreams are now satisfied
|
|
||||||
let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| {
|
|
||||||
self.partitions
|
|
||||||
.get(&upstream.r#ref)
|
|
||||||
.and_then(|p| p.status.as_ref())
|
|
||||||
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
|
|
||||||
.unwrap_or(false)
|
|
||||||
});
|
|
||||||
|
|
||||||
if all_upstreams_satisfied {
|
|
||||||
// Transition back to WantIdle so it can be rescheduled
|
|
||||||
want.status = Some(WantStatusCode::WantIdle.into());
|
|
||||||
want.last_updated_timestamp = current_timestamp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vec![])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
|
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
|
||||||
|
|
@ -289,126 +232,35 @@ impl BuildState {
|
||||||
fn handle_job_run_failure(
|
fn handle_job_run_failure(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &JobRunFailureEventV1,
|
event: &JobRunFailureEventV1,
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
) -> Result<(), DatabuildError> {
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
|
||||||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||||
|
for pref in job_run.building_partitions {
|
||||||
// Clone building_partitions before we use it multiple times
|
|
||||||
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
|
|
||||||
|
|
||||||
for pref in &failed_partitions {
|
|
||||||
self.update_partition_status(
|
self.update_partition_status(
|
||||||
pref,
|
&pref,
|
||||||
PartitionStatusCode::PartitionFailed,
|
PartitionStatusCode::PartitionFailed,
|
||||||
Some(&event.job_run_id),
|
Some(&event.job_run_id),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
// Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions
|
|
||||||
let wants_to_fail: Vec<String> = self
|
|
||||||
.wants
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, want)| {
|
|
||||||
want.status.as_ref().map(|s| s.code)
|
|
||||||
== Some(WantStatusCode::WantUpstreamBuilding as i32)
|
|
||||||
})
|
|
||||||
.filter(|(_, want)| {
|
|
||||||
// Check if this want was waiting for any of the failed partitions
|
|
||||||
want.upstreams
|
|
||||||
.iter()
|
|
||||||
.any(|upstream| failed_partitions.iter().any(|p| p.r#ref == upstream.r#ref))
|
|
||||||
})
|
|
||||||
.map(|(want_id, _)| want_id.clone())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for want_id in wants_to_fail {
|
|
||||||
if let Some(want) = self.wants.get_mut(&want_id) {
|
|
||||||
// Transition to WantUpstreamFailed since a dependency failed
|
|
||||||
want.status = Some(WantStatusCode::WantUpstreamFailed.into());
|
|
||||||
want.last_updated_timestamp = current_timestamp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(vec![])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_cancel(
|
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &JobRunCancelEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
todo!("should update already inserted job run, partition status, want status")
|
todo!("should update already inserted job run, partition status, want status")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_job_run_dep_miss(
|
fn handle_job_run_dep_miss(
|
||||||
&mut self,
|
&mut self,
|
||||||
event: &JobRunMissingDepsEventV1,
|
event: &JobRunMissingDepsEventV1,
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
) -> Result<(), DatabuildError> {
|
||||||
let job_run_detail = self.get_job_run(&event.job_run_id).ok_or(format!(
|
todo!("should update already inserted job run, schedule wants...?")
|
||||||
"Unable to find job run with id `{}`",
|
|
||||||
event.job_run_id
|
|
||||||
))?;
|
|
||||||
// Infer data/SLA timestamps from upstream want
|
|
||||||
let want_timestamps: WantTimestamps = job_run_detail
|
|
||||||
.servicing_wants
|
|
||||||
.iter()
|
|
||||||
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
|
||||||
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
|
|
||||||
.ok_or(format!("No servicing wants found"))?;
|
|
||||||
|
|
||||||
// Update servicing wants to track missing dependencies as upstreams
|
|
||||||
for servicing_want in &job_run_detail.servicing_wants {
|
|
||||||
if let Some(want) = self.wants.get_mut(&servicing_want.want_id) {
|
|
||||||
let mut want_is_impacted = false;
|
|
||||||
for missing_dep in &event.missing_deps {
|
|
||||||
// Only update this want if it contains an impacted partition
|
|
||||||
let impacted = missing_dep
|
|
||||||
.impacted
|
|
||||||
.iter()
|
|
||||||
.any(|impacted| want.partitions.iter().any(|p| p.r#ref == impacted.r#ref));
|
|
||||||
|
|
||||||
if impacted {
|
|
||||||
want_is_impacted = true;
|
|
||||||
// Add missing partitions to upstreams
|
|
||||||
for missing_partition in &missing_dep.missing {
|
|
||||||
want.upstreams.push(missing_partition.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dedupe upstreams
|
|
||||||
let mut seen = std::collections::HashSet::new();
|
|
||||||
want.upstreams.retain(|p| seen.insert(p.r#ref.clone()));
|
|
||||||
|
|
||||||
// Set impacted wants to WantUpstreamBuilding so they won't be rescheduled
|
|
||||||
// until their dependencies are ready
|
|
||||||
if want_is_impacted {
|
|
||||||
want.status = Some(WantStatusCode::WantUpstreamBuilding.into());
|
|
||||||
want.last_updated_timestamp = current_timestamp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create wants from dep misses
|
|
||||||
let want_events = missing_deps_to_want_events(
|
|
||||||
event.missing_deps.clone(),
|
|
||||||
&event.job_run_id,
|
|
||||||
want_timestamps,
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(want_events)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_taint_create(
|
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &TaintCreateEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
todo!("...?")
|
todo!("...?")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_taint_delete(
|
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> {
|
||||||
&mut self,
|
|
||||||
event: &TaintCancelEventV1,
|
|
||||||
) -> Result<Vec<Event>, DatabuildError> {
|
|
||||||
todo!("...?")
|
todo!("...?")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -516,18 +368,53 @@ impl BuildState {
|
||||||
WantsSchedulability(
|
WantsSchedulability(
|
||||||
self.wants
|
self.wants
|
||||||
.values()
|
.values()
|
||||||
// Only consider idle wants for schedulability - all other states are either
|
// Do not consider fulfilled or currently building wants in schedulability query
|
||||||
// terminal (successful/failed/canceled/upstream failed) or waiting for an event
|
|
||||||
// (building/upstream building)
|
|
||||||
.filter(|w| {
|
.filter(|w| {
|
||||||
w.status.clone().expect("want must have status").code
|
w.status.clone().expect("want must have status").code
|
||||||
== WantStatusCode::WantIdle as i32
|
!= WantStatusCode::WantSuccessful as i32
|
||||||
|
})
|
||||||
|
.filter(|w| {
|
||||||
|
w.status.clone().expect("want must have status").code
|
||||||
|
!= WantStatusCode::WantBuilding as i32
|
||||||
})
|
})
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(|w| self.want_schedulability(&w))
|
.map(|w| self.want_schedulability(&w))
|
||||||
.collect(),
|
.collect(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maps a dep miss into the BEL events it implies, so that the job can be run successfully later
|
||||||
|
pub fn dep_miss_to_events(
|
||||||
|
&self,
|
||||||
|
dep_miss: &DepMissJobRun<SubProcessBackend>,
|
||||||
|
) -> Result<Vec<Event>, DatabuildError> {
|
||||||
|
let mut events = vec![];
|
||||||
|
// Append literal job run dep miss
|
||||||
|
events.push(dep_miss.state.to_event(&dep_miss.id()));
|
||||||
|
// Append wants from dep miss
|
||||||
|
let job_run_detail = self
|
||||||
|
.get_job_run(&dep_miss.job_run_id.to_string())
|
||||||
|
.ok_or(format!(
|
||||||
|
"Unable to find job run with id `{}`",
|
||||||
|
dep_miss.job_run_id
|
||||||
|
))?;
|
||||||
|
// Infer data/SLA timestamps from upstream want
|
||||||
|
let want_timestamps: WantTimestamps = job_run_detail
|
||||||
|
.servicing_wants
|
||||||
|
.iter()
|
||||||
|
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
||||||
|
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
|
||||||
|
.ok_or(format!("No servicing wants found"))?;
|
||||||
|
// Create wants from dep misses
|
||||||
|
let want_events = missing_deps_to_want_events(
|
||||||
|
dep_miss.state.missing_deps.clone(),
|
||||||
|
&dep_miss.job_run_id,
|
||||||
|
want_timestamps,
|
||||||
|
);
|
||||||
|
events.extend(want_events);
|
||||||
|
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
||||||
|
|
@ -656,105 +543,6 @@ mod tests {
|
||||||
assert!(!ws.is_schedulable());
|
assert!(!ws.is_schedulable());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_want_not_schedulable_after_dep_miss_until_deps_exist() {
|
|
||||||
use crate::{
|
|
||||||
JobRunDetail, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions,
|
|
||||||
};
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
// Given: A want with a job run that had a dep miss
|
|
||||||
let beta_partition = "data/beta";
|
|
||||||
let alpha_partition = "data/alpha";
|
|
||||||
let want_id = "beta_want";
|
|
||||||
let job_run_id = "job_123";
|
|
||||||
|
|
||||||
let mut state = BuildState::default().with_wants(BTreeMap::from([(
|
|
||||||
want_id.to_string(),
|
|
||||||
WantDetail::default()
|
|
||||||
.with_partitions(vec![beta_partition.into()])
|
|
||||||
.with_status(Some(WantStatusCode::WantIdle.into())),
|
|
||||||
)]));
|
|
||||||
|
|
||||||
// Job run exists for this want
|
|
||||||
state.job_runs.insert(
|
|
||||||
job_run_id.to_string(),
|
|
||||||
JobRunDetail {
|
|
||||||
id: job_run_id.to_string(),
|
|
||||||
servicing_wants: vec![WantAttributedPartitions {
|
|
||||||
want_id: want_id.to_string(),
|
|
||||||
partitions: vec![beta_partition.into()],
|
|
||||||
}],
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Initially, want should be schedulable (no known upstreams)
|
|
||||||
let schedulability = state.wants_schedulability();
|
|
||||||
assert_eq!(schedulability.0.len(), 1);
|
|
||||||
assert!(
|
|
||||||
schedulability.0[0].is_schedulable(),
|
|
||||||
"want should be schedulable before dep miss"
|
|
||||||
);
|
|
||||||
|
|
||||||
// When: Job run fails with dep miss indicating it needs alpha
|
|
||||||
let dep_miss_event = JobRunMissingDepsEventV1 {
|
|
||||||
job_run_id: job_run_id.to_string(),
|
|
||||||
missing_deps: vec![MissingDeps {
|
|
||||||
impacted: vec![beta_partition.into()],
|
|
||||||
missing: vec![alpha_partition.into()],
|
|
||||||
}],
|
|
||||||
read_deps: vec![],
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_events = state.handle_job_run_dep_miss(&dep_miss_event).unwrap();
|
|
||||||
for event in new_events {
|
|
||||||
state.handle_event(&event).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then: Beta want should be in WantUpstreamBuilding status and not in schedulability list
|
|
||||||
let schedulability = state.wants_schedulability();
|
|
||||||
|
|
||||||
// The schedulability list should contain the newly created alpha want, but not the beta want
|
|
||||||
let has_beta_want = schedulability
|
|
||||||
.0
|
|
||||||
.iter()
|
|
||||||
.any(|ws| ws.want.partitions.iter().any(|p| p.r#ref == beta_partition));
|
|
||||||
assert!(
|
|
||||||
!has_beta_want,
|
|
||||||
"beta want should not appear in schedulability list when in WantUpstreamBuilding status"
|
|
||||||
);
|
|
||||||
|
|
||||||
// The alpha want should be schedulable
|
|
||||||
let has_alpha_want = schedulability.0.iter().any(|ws| {
|
|
||||||
ws.want
|
|
||||||
.partitions
|
|
||||||
.iter()
|
|
||||||
.any(|p| p.r#ref == alpha_partition)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
has_alpha_want,
|
|
||||||
"alpha want should be schedulable (newly created from dep miss)"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Verify the beta want is now in WantUpstreamBuilding status
|
|
||||||
let beta_want = state.wants.get(want_id).expect("beta want should exist");
|
|
||||||
assert_eq!(
|
|
||||||
beta_want.status.as_ref().unwrap().code,
|
|
||||||
WantStatusCode::WantUpstreamBuilding as i32,
|
|
||||||
"want should be in WantUpstreamBuilding status after dep miss"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
beta_want.upstreams.len(),
|
|
||||||
1,
|
|
||||||
"want should have one upstream"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
beta_want.upstreams[0].r#ref, alpha_partition,
|
|
||||||
"upstream should be alpha"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
|
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
|
||||||
|
|
@ -793,9 +581,7 @@ mod tests {
|
||||||
e.partitions = vec!["mypart".into()];
|
e.partitions = vec!["mypart".into()];
|
||||||
|
|
||||||
let mut state = BuildState::default();
|
let mut state = BuildState::default();
|
||||||
state
|
state.handle_event(&e.clone().into());
|
||||||
.handle_event(&e.clone().into())
|
|
||||||
.expect("want create failed");
|
|
||||||
let want = state.get_want("1234").unwrap();
|
let want = state.get_want("1234").unwrap();
|
||||||
let mut expected: WantDetail = e.into();
|
let mut expected: WantDetail = e.into();
|
||||||
// Into will set this field as current timestamp
|
// Into will set this field as current timestamp
|
||||||
|
|
@ -810,16 +596,12 @@ mod tests {
|
||||||
e.partitions = vec!["mypart".into()];
|
e.partitions = vec!["mypart".into()];
|
||||||
|
|
||||||
let mut state = BuildState::default();
|
let mut state = BuildState::default();
|
||||||
state
|
state.handle_event(&e.clone().into());
|
||||||
.handle_event(&e.clone().into())
|
|
||||||
.expect("want create failed");
|
|
||||||
|
|
||||||
// Should be able to cancel
|
// Should be able to cancel
|
||||||
let mut e = WantCancelEventV1::default();
|
let mut e = WantCancelEventV1::default();
|
||||||
e.want_id = "1234".to_string();
|
e.want_id = "1234".to_string();
|
||||||
state
|
state.handle_event(&e.clone().into());
|
||||||
.handle_event(&e.clone().into())
|
|
||||||
.expect("want cancel failed");
|
|
||||||
let want = state.get_want("1234").unwrap();
|
let want = state.get_want("1234").unwrap();
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,7 @@ impl WantTimestamps {
|
||||||
|
|
||||||
pub fn missing_deps_to_want_events(
|
pub fn missing_deps_to_want_events(
|
||||||
missing_deps: Vec<MissingDeps>,
|
missing_deps: Vec<MissingDeps>,
|
||||||
job_run_id: &String,
|
job_run_id: &Uuid,
|
||||||
want_timestamps: WantTimestamps,
|
want_timestamps: WantTimestamps,
|
||||||
) -> Vec<Event> {
|
) -> Vec<Event> {
|
||||||
missing_deps
|
missing_deps
|
||||||
|
|
@ -125,7 +125,7 @@ pub fn missing_deps_to_want_events(
|
||||||
sla_seconds: want_timestamps.sla_seconds,
|
sla_seconds: want_timestamps.sla_seconds,
|
||||||
source: Some(
|
source: Some(
|
||||||
JobTriggeredEvent {
|
JobTriggeredEvent {
|
||||||
job_run_id: job_run_id.clone(),
|
job_run_id: job_run_id.to_string(),
|
||||||
}
|
}
|
||||||
.into(),
|
.into(),
|
||||||
),
|
),
|
||||||
|
|
|
||||||
|
|
@ -181,8 +181,6 @@ enum WantStatusCode {
|
||||||
WantFailed = 2;
|
WantFailed = 2;
|
||||||
WantSuccessful = 3;
|
WantSuccessful = 3;
|
||||||
WantCanceled = 4;
|
WantCanceled = 4;
|
||||||
WantUpstreamBuilding = 5;
|
|
||||||
WantUpstreamFailed = 6;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message WantDetail {
|
message WantDetail {
|
||||||
|
|
|
||||||
2
databuild/event_defaults.rs
Normal file
2
databuild/event_defaults.rs
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
use uuid::Uuid;
|
||||||
|
use crate::{PartitionRef, WantCreateEventV1};
|
||||||
|
|
@ -1,11 +1,12 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::data_deps::JobRunDataDepResults;
|
use crate::data_deps::JobRunDataDepResults;
|
||||||
use crate::util::DatabuildError;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
|
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
|
||||||
JobRunSuccessEventV1, MissingDeps, ReadDeps,
|
JobRunSuccessEventV1, MissingDeps, ReadDeps,
|
||||||
};
|
};
|
||||||
|
use crate::util::DatabuildError;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::error::Error;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::process::{Child, Command, Stdio};
|
use std::process::{Child, Command, Stdio};
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ mod job;
|
||||||
mod util;
|
mod util;
|
||||||
mod build_state;
|
mod build_state;
|
||||||
mod event_transforms;
|
mod event_transforms;
|
||||||
|
mod event_defaults;
|
||||||
mod data_deps;
|
mod data_deps;
|
||||||
mod mock_job_run;
|
mod mock_job_run;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use crate::data_deps::DataDepLogLine;
|
|
||||||
use crate::{JobRunMissingDeps, MissingDeps};
|
|
||||||
|
|
||||||
pub struct MockJobRun {
|
pub struct MockJobRun {
|
||||||
sleep_ms: u64,
|
sleep_ms: u64,
|
||||||
|
|
@ -53,16 +51,6 @@ impl MockJobRun {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dep_miss(self, missing_deps: Vec<MissingDeps>) -> Self {
|
|
||||||
self.exit_code(1)
|
|
||||||
.stdout_msg(
|
|
||||||
&DataDepLogLine::DepMiss(JobRunMissingDeps {
|
|
||||||
version: "1".to_string(),
|
|
||||||
missing_deps,
|
|
||||||
}).into()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn to_env(&self) -> HashMap<String, String> {
|
pub fn to_env(&self) -> HashMap<String, String> {
|
||||||
let mut env = HashMap::new();
|
let mut env = HashMap::new();
|
||||||
env.insert(
|
env.insert(
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,12 @@
|
||||||
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
||||||
|
use crate::build_state::BuildState;
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::job::JobConfiguration;
|
use crate::job::JobConfiguration;
|
||||||
use crate::job_run::{
|
use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend};
|
||||||
CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun,
|
|
||||||
RunningJobRun, SubProcessBackend,
|
|
||||||
};
|
|
||||||
use crate::util::DatabuildError;
|
use crate::util::DatabuildError;
|
||||||
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
|
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -177,8 +176,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
JobRunVisitResult::Completed(completed) => {
|
JobRunVisitResult::Completed(completed) => {
|
||||||
// Emit success event
|
// Emit success event
|
||||||
println!("Completed job: {:?}", completed.id());
|
println!("Completed job: {:?}", completed.id());
|
||||||
self.bel
|
self.bel.append_event(&completed.state.to_event(&completed.id()))?;
|
||||||
.append_event(&completed.state.to_event(&completed.id()))?;
|
|
||||||
self.completed_jobs.push(completed);
|
self.completed_jobs.push(completed);
|
||||||
}
|
}
|
||||||
JobRunVisitResult::Failed(failed) => {
|
JobRunVisitResult::Failed(failed) => {
|
||||||
|
|
@ -190,8 +188,9 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
JobRunVisitResult::DepMiss(dep_miss) => {
|
JobRunVisitResult::DepMiss(dep_miss) => {
|
||||||
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
||||||
let event = dep_miss.state.to_event(&dep_miss.id());
|
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
|
}
|
||||||
self.dep_miss_jobs.push(dep_miss);
|
self.dep_miss_jobs.push(dep_miss);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -449,91 +448,98 @@ mod tests {
|
||||||
/// Orchestrator want creation is the means of data dependency propagation, allowing the
|
/// Orchestrator want creation is the means of data dependency propagation, allowing the
|
||||||
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
|
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
|
||||||
mod want_create {
|
mod want_create {
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
|
||||||
|
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
|
||||||
|
use crate::{
|
||||||
|
JobRunBufferEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1,
|
||||||
|
};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
// /// Use case: The orchestrator should map a failed job into a set of wants
|
/// Use case: The orchestrator should map a failed job into a set of wants
|
||||||
// #[test]
|
#[test]
|
||||||
// fn test_job_fail_want_mapping() {
|
fn test_job_fail_want_mapping() {
|
||||||
// // Given a
|
// Given a
|
||||||
// let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
|
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
|
||||||
// // Add event for originating want
|
// Add event for originating want
|
||||||
// let want_create = WantCreateEventV1::sample();
|
let want_create = WantCreateEventV1::sample();
|
||||||
// let building_partitions = vec!["data/beta".into()];
|
let building_partitions = vec!["data/beta".into()];
|
||||||
// orchestrator
|
orchestrator
|
||||||
// .bel
|
.bel
|
||||||
// .append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
||||||
// partitions: building_partitions.clone(),
|
partitions: building_partitions.clone(),
|
||||||
// ..want_create.clone()
|
..want_create.clone()
|
||||||
// }))
|
}))
|
||||||
// .expect("event append");
|
.expect("event append");
|
||||||
// // Create failed job run detail
|
// Create failed job run detail
|
||||||
// let want_attributed_partitions: Vec<WantAttributedPartitions> =
|
let want_attributed_partitions: Vec<WantAttributedPartitions> =
|
||||||
// vec![want_create.clone().into()];
|
vec![want_create.clone().into()];
|
||||||
// let job_run_id = Uuid::new_v4();
|
let job_run_id = Uuid::new_v4();
|
||||||
// let job_run = JobRunBufferEventV1 {
|
let job_run = JobRunBufferEventV1 {
|
||||||
// job_run_id: job_run_id.into(),
|
job_run_id: job_run_id.into(),
|
||||||
// building_partitions: building_partitions.clone(),
|
building_partitions: building_partitions.clone(),
|
||||||
// want_attributed_partitions: want_attributed_partitions.clone(),
|
want_attributed_partitions: want_attributed_partitions.clone(),
|
||||||
// ..JobRunBufferEventV1::default()
|
..JobRunBufferEventV1::default()
|
||||||
// };
|
};
|
||||||
// orchestrator
|
orchestrator
|
||||||
// .bel
|
.bel
|
||||||
// .append_event(&Event::JobRunBufferV1(job_run))
|
.append_event(&Event::JobRunBufferV1(job_run))
|
||||||
// .expect("event append");
|
.expect("event append");
|
||||||
//
|
|
||||||
// // Job runs should not be empty
|
// Job runs should not be empty
|
||||||
// orchestrator
|
orchestrator
|
||||||
// .bel
|
.bel
|
||||||
// .state
|
.state
|
||||||
// .get_job_run(&job_run_id.to_string())
|
.get_job_run(&job_run_id.to_string())
|
||||||
// .expect("job run should exist");
|
.expect("job run should exist");
|
||||||
//
|
|
||||||
// // Add event for job failure
|
// Add event for job failure
|
||||||
// let dep_miss_job_run = DepMissJobRun {
|
let dep_miss_job_run = DepMissJobRun {
|
||||||
// job_run_id,
|
job_run_id,
|
||||||
// state: SubProcessDepMiss {
|
state: SubProcessDepMiss {
|
||||||
// stdout_buffer: vec![],
|
stdout_buffer: vec![],
|
||||||
// missing_deps: vec![MissingDeps {
|
missing_deps: vec![MissingDeps {
|
||||||
// impacted: vec!["data/beta".into()],
|
impacted: vec!["data/beta".into()],
|
||||||
// missing: vec!["data/alpha".into()],
|
missing: vec!["data/alpha".into()],
|
||||||
// }],
|
}],
|
||||||
// read_deps: vec![],
|
read_deps: vec![],
|
||||||
// },
|
},
|
||||||
// _backend: PhantomData,
|
_backend: PhantomData,
|
||||||
// };
|
};
|
||||||
//
|
|
||||||
// // When calculating events from dep miss
|
// When calculating events from dep miss
|
||||||
// // TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
|
// TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
|
||||||
// let dep_miss_event = dep_miss_job_run.state.to_event(&dep_miss_job_run.id());;
|
let events = orchestrator
|
||||||
// let events = orchestrator
|
.bel
|
||||||
// .bel
|
.state
|
||||||
// .state
|
.dep_miss_to_events(&dep_miss_job_run)
|
||||||
// .handle_job_run_dep_miss(&dep_miss_event)
|
.unwrap();
|
||||||
// .unwrap();
|
|
||||||
//
|
// Should have scheduled a job for alpha
|
||||||
// // Should have scheduled a job for alpha
|
assert_eq!(
|
||||||
// assert_eq!(
|
events
|
||||||
// events
|
.iter()
|
||||||
// .iter()
|
.filter(|e| match e {
|
||||||
// .filter(|e| match e {
|
Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
|
||||||
// Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
|
_ => false,
|
||||||
// _ => false,
|
})
|
||||||
// })
|
.count(),
|
||||||
// .count(),
|
1
|
||||||
// 1
|
);
|
||||||
// );
|
assert!(
|
||||||
// assert!(
|
orchestrator.not_started_jobs.is_empty(),
|
||||||
// orchestrator.not_started_jobs.is_empty(),
|
"shouldn't have scheduled yet"
|
||||||
// "shouldn't have scheduled yet"
|
);
|
||||||
// );
|
|
||||||
//
|
// Should schedule job after we poll wants
|
||||||
// // Should schedule job after we poll wants
|
orchestrator.poll_wants().expect("poll wants");
|
||||||
// orchestrator.poll_wants().expect("poll wants");
|
assert_eq!(
|
||||||
// assert_eq!(
|
orchestrator.not_started_jobs.len(),
|
||||||
// orchestrator.not_started_jobs.len(),
|
1,
|
||||||
// 1,
|
"should have scheduled job"
|
||||||
// "should have scheduled job"
|
);
|
||||||
// );
|
}
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Orchestrator needs to be able to achieve high level orchestration use cases.
|
/// Orchestrator needs to be able to achieve high level orchestration use cases.
|
||||||
|
|
@ -605,197 +611,15 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper to wait for running jobs to complete with timeout
|
|
||||||
fn wait_for_jobs_to_complete<S: crate::build_event_log::BELStorage + std::fmt::Debug>(
|
|
||||||
orchestrator: &mut crate::orchestrator::Orchestrator<S>,
|
|
||||||
max_steps: usize,
|
|
||||||
) -> Result<(), String> {
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
for _i in 0..max_steps {
|
|
||||||
thread::sleep(Duration::from_millis(50));
|
|
||||||
if orchestrator.running_jobs.is_empty() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
orchestrator
|
|
||||||
.step()
|
|
||||||
.map_err(|e| format!("step failed: {}", e))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(format!("Jobs did not complete after {} steps", max_steps))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use case: a graph with multi-hop deps should achieve the multi-hop build
|
// Use case: a graph with multi-hop deps should achieve the multi-hop build
|
||||||
// - Job B depends on part_a produced by job A
|
// - Job B depends on part_a produced by job A
|
||||||
// - Job B should be attempted, fail, and create a want for part_a
|
// - Job B should be attempted, fail, and create a want for part_a
|
||||||
// - Job A should be attempted, succeed, and produce part_a
|
// - Job A should be attempted, succeed, and produce part_a
|
||||||
// - Job B should be attempted, succeed, and produce part_b
|
// - Job B should be attempted, succeed, and produce part_b
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
fn test_multi_hop_want_builds_partition() {
|
fn test_multi_hop_want_builds_partition() {
|
||||||
use crate::job::JobConfiguration;
|
todo!()
|
||||||
use crate::orchestrator::OrchestratorConfig;
|
|
||||||
use std::fs;
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
|
|
||||||
// Clean up marker file from any previous runs
|
|
||||||
let marker_file = "/tmp/databuild_test_alpha_complete";
|
|
||||||
let _ = fs::remove_file(marker_file);
|
|
||||||
|
|
||||||
// Create inline test scripts in /tmp
|
|
||||||
let alpha_script = "/tmp/test_job_alpha.sh";
|
|
||||||
let beta_script = "/tmp/test_job_beta.sh";
|
|
||||||
|
|
||||||
// Alpha job: creates marker file and outputs success
|
|
||||||
fs::write(
|
|
||||||
alpha_script,
|
|
||||||
r#"#!/bin/bash
|
|
||||||
touch /tmp/databuild_test_alpha_complete
|
|
||||||
echo '{"DataDepLogLine":{"Success":{"version":"1","produced_partitions":["data/alpha"]}}}'
|
|
||||||
"#,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Beta job: checks for alpha marker, outputs dep miss if not found
|
|
||||||
fs::write(beta_script, r#"#!/bin/bash
|
|
||||||
if [ ! -f /tmp/databuild_test_alpha_complete ]; then
|
|
||||||
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo 'Beta succeeded'
|
|
||||||
"#).unwrap();
|
|
||||||
|
|
||||||
// Make scripts executable
|
|
||||||
fs::set_permissions(alpha_script, fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
fs::set_permissions(beta_script, fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
|
|
||||||
// Given: Set up orchestrator with alpha and beta jobs using test scripts
|
|
||||||
let mut orchestrator = build_orchestrator();
|
|
||||||
orchestrator.config = OrchestratorConfig {
|
|
||||||
jobs: vec![
|
|
||||||
JobConfiguration {
|
|
||||||
label: "alpha".to_string(),
|
|
||||||
patterns: vec!["data/alpha".to_string()],
|
|
||||||
entry_point: alpha_script.to_string(),
|
|
||||||
},
|
|
||||||
JobConfiguration {
|
|
||||||
label: "beta".to_string(),
|
|
||||||
patterns: vec!["data/beta".to_string()],
|
|
||||||
entry_point: beta_script.to_string(),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
|
|
||||||
let partition_beta = "data/beta";
|
|
||||||
let partition_alpha = "data/alpha";
|
|
||||||
|
|
||||||
// Create initial want for beta partition
|
|
||||||
orchestrator
|
|
||||||
.bel
|
|
||||||
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
|
||||||
partitions: vec![partition_beta.into()],
|
|
||||||
..WantCreateEventV1::sample()
|
|
||||||
}))
|
|
||||||
.expect("event append");
|
|
||||||
|
|
||||||
// When: Run orchestrator steps to let it naturally handle the multi-hop build
|
|
||||||
// Step 1: Should schedule beta job (want -> not_started_jobs)
|
|
||||||
orchestrator.step().expect("step 1");
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.not_started_jobs.len(),
|
|
||||||
1,
|
|
||||||
"beta job should be queued"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Step 2: Should start beta job (not_started_jobs -> running_jobs)
|
|
||||||
orchestrator.step().expect("step 2");
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.running_jobs.len(),
|
|
||||||
1,
|
|
||||||
"beta job should be running"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Step 3: Beta job detects missing alpha dep and creates want
|
|
||||||
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
|
|
||||||
// (Beta should now be in dep_miss state, and a want for alpha should be created)
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.dep_miss_jobs.len(),
|
|
||||||
1,
|
|
||||||
"beta should have dep miss"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Step 4: Should schedule and start alpha job
|
|
||||||
// (dep miss handler created the alpha want, which will be picked up by poll_wants)
|
|
||||||
orchestrator.step().expect("step 4");
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.running_jobs.len(),
|
|
||||||
1,
|
|
||||||
"alpha job should be running"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Step 6: Alpha completes successfully
|
|
||||||
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete");
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.completed_jobs.len(),
|
|
||||||
1,
|
|
||||||
"alpha should complete"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator
|
|
||||||
.bel
|
|
||||||
.state
|
|
||||||
.get_partition(partition_alpha)
|
|
||||||
.unwrap()
|
|
||||||
.status,
|
|
||||||
Some(PartitionStatusCode::PartitionLive.into()),
|
|
||||||
"alpha partition should be live"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Step 7: Beta is rescheduled and started (want -> running_jobs)
|
|
||||||
orchestrator.step().expect("step 7");
|
|
||||||
assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running");
|
|
||||||
|
|
||||||
// Step 8: Beta completes successfully
|
|
||||||
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
|
|
||||||
|
|
||||||
// Then: Verify both partitions are live and both jobs completed
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.completed_jobs.len(),
|
|
||||||
2,
|
|
||||||
"both jobs should complete"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator.dep_miss_jobs.len(),
|
|
||||||
1,
|
|
||||||
"should have one dep miss"
|
|
||||||
);
|
|
||||||
assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail");
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator
|
|
||||||
.bel
|
|
||||||
.state
|
|
||||||
.get_partition(partition_alpha)
|
|
||||||
.unwrap()
|
|
||||||
.status,
|
|
||||||
Some(PartitionStatusCode::PartitionLive.into()),
|
|
||||||
"alpha partition should be live"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
orchestrator
|
|
||||||
.bel
|
|
||||||
.state
|
|
||||||
.get_partition(partition_beta)
|
|
||||||
.unwrap()
|
|
||||||
.status,
|
|
||||||
Some(PartitionStatusCode::PartitionLive.into()),
|
|
||||||
"beta partition should be live after multi-hop build"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
let _ = fs::remove_file(marker_file);
|
|
||||||
let _ = fs::remove_file(alpha_script);
|
|
||||||
let _ = fs::remove_file(beta_script);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue