parent
5361e295e0
commit
2cf778a07b
13 changed files with 737 additions and 316 deletions
14
MODULE.bazel
14
MODULE.bazel
|
|
@ -3,15 +3,21 @@ module(
|
||||||
version = "0.1",
|
version = "0.1",
|
||||||
)
|
)
|
||||||
|
|
||||||
bazel_dep(name = "bazel_skylib", version = "1.8.1")
|
bazel_dep(name = "bazel_skylib", version = "1.8.2")
|
||||||
bazel_dep(name = "platforms", version = "0.0.11")
|
bazel_dep(name = "platforms", version = "1.0.0")
|
||||||
bazel_dep(name = "rules_shell", version = "0.4.0")
|
bazel_dep(name = "rules_shell", version = "0.6.1")
|
||||||
bazel_dep(name = "rules_oci", version = "2.2.6")
|
bazel_dep(name = "rules_oci", version = "2.2.6")
|
||||||
bazel_dep(name = "aspect_bazel_lib", version = "2.14.0")
|
bazel_dep(name = "aspect_bazel_lib", version = "2.14.0")
|
||||||
bazel_dep(name = "rules_rust", version = "0.61.0")
|
bazel_dep(name = "rules_rust", version = "0.67.0")
|
||||||
bazel_dep(name = "rules_proto", version = "7.0.2")
|
bazel_dep(name = "rules_proto", version = "7.0.2")
|
||||||
bazel_dep(name = "protobuf", version = "29.0", repo_name = "com_google_protobuf")
|
bazel_dep(name = "protobuf", version = "29.0", repo_name = "com_google_protobuf")
|
||||||
|
|
||||||
|
#rust = use_extension("@rules_rust//rust:extensions.bzl", "rust")
|
||||||
|
#rust.toolchain(
|
||||||
|
# edition = "2024",
|
||||||
|
# versions = ["1.91.1"],
|
||||||
|
#)
|
||||||
|
|
||||||
crate = use_extension("@rules_rust//crate_universe:extensions.bzl", "crate")
|
crate = use_extension("@rules_rust//crate_universe:extensions.bzl", "crate")
|
||||||
crate.spec(
|
crate.spec(
|
||||||
features = ["derive"],
|
features = ["derive"],
|
||||||
|
|
|
||||||
File diff suppressed because one or more lines are too long
|
|
@ -40,6 +40,7 @@ rust_test(
|
||||||
name = "databuild_test",
|
name = "databuild_test",
|
||||||
crate = ":databuild",
|
crate = ":databuild",
|
||||||
data = ["//databuild/test:test_job_helper"],
|
data = ["//databuild/test:test_job_helper"],
|
||||||
|
env = {"RUST_BACKTRACE": "1"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Legacy filegroup for backwards compatibility
|
# Legacy filegroup for backwards compatibility
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,15 @@ use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
use crate::build_state::BuildState;
|
use crate::build_state::BuildState;
|
||||||
use crate::util::current_timestamp;
|
use crate::util::{current_timestamp, DatabuildError};
|
||||||
|
|
||||||
pub trait BELStorage {
|
pub trait BELStorage {
|
||||||
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>>;
|
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
|
||||||
fn list_events(
|
fn list_events(
|
||||||
&self,
|
&self,
|
||||||
since_idx: u64,
|
since_idx: u64,
|
||||||
limit: u64,
|
limit: u64,
|
||||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
|
@ -35,7 +35,7 @@ impl MemoryBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BELStorage for MemoryBELStorage {
|
impl BELStorage for MemoryBELStorage {
|
||||||
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
let timestamp = current_timestamp();
|
let timestamp = current_timestamp();
|
||||||
let dbe = DataBuildEvent {
|
let dbe = DataBuildEvent {
|
||||||
timestamp,
|
timestamp,
|
||||||
|
|
@ -50,7 +50,7 @@ impl BELStorage for MemoryBELStorage {
|
||||||
&self,
|
&self,
|
||||||
since_idx: u64,
|
since_idx: u64,
|
||||||
limit: u64,
|
limit: u64,
|
||||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.events
|
.events
|
||||||
.iter()
|
.iter()
|
||||||
|
|
@ -67,7 +67,7 @@ struct SqliteBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SqliteBELStorage {
|
impl SqliteBELStorage {
|
||||||
fn create(database_url: &str) -> Result<SqliteBELStorage, Box<dyn Error>> {
|
fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
|
||||||
let connection = Connection::open(database_url)?;
|
let connection = Connection::open(database_url)?;
|
||||||
|
|
||||||
// Create the events table
|
// Create the events table
|
||||||
|
|
@ -85,7 +85,7 @@ impl SqliteBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BELStorage for SqliteBELStorage {
|
impl BELStorage for SqliteBELStorage {
|
||||||
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||||
|
|
@ -113,7 +113,7 @@ impl BELStorage for SqliteBELStorage {
|
||||||
&self,
|
&self,
|
||||||
since_idx: u64,
|
since_idx: u64,
|
||||||
limit: u64,
|
limit: u64,
|
||||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
|
||||||
let mut stmt = self.connection.prepare(
|
let mut stmt = self.connection.prepare(
|
||||||
"SELECT event_id, timestamp, event_data FROM events
|
"SELECT event_id, timestamp, event_data FROM events
|
||||||
WHERE timestamp > ?1
|
WHERE timestamp > ?1
|
||||||
|
|
@ -164,8 +164,8 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
||||||
BuildEventLog { storage, state }
|
BuildEventLog { storage, state }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
self.state.handle_event(&event);
|
self.state.handle_event(&event)?;
|
||||||
let idx = self.storage.append_event(event)?;
|
let idx = self.storage.append_event(event)?;
|
||||||
Ok(idx)
|
Ok(idx)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,15 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::util::current_timestamp;
|
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
|
||||||
|
use crate::job_run::{DepMissJobRun, SubProcessBackend};
|
||||||
|
use crate::util::{current_timestamp, DatabuildError};
|
||||||
use crate::{
|
use crate::{
|
||||||
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
|
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
|
||||||
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
|
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
||||||
ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintDetail, WantDetail,
|
JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse,
|
||||||
WantStatusCode,
|
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
|
||||||
|
ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode,
|
||||||
|
TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1,
|
||||||
|
WantDetail, WantStatusCode,
|
||||||
};
|
};
|
||||||
use rusqlite::types::FromSql;
|
use rusqlite::types::FromSql;
|
||||||
use rusqlite::ToSql;
|
use rusqlite::ToSql;
|
||||||
|
|
@ -35,10 +40,10 @@ and updates, which is exceptionally fast.
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct BuildState {
|
pub struct BuildState {
|
||||||
pub wants: BTreeMap<String, WantDetail>,
|
wants: BTreeMap<String, WantDetail>,
|
||||||
pub taints: BTreeMap<String, TaintDetail>,
|
taints: BTreeMap<String, TaintDetail>,
|
||||||
pub partitions: BTreeMap<String, PartitionDetail>,
|
partitions: BTreeMap<String, PartitionDetail>,
|
||||||
pub job_runs: BTreeMap<String, JobRunDetail>,
|
job_runs: BTreeMap<String, JobRunDetail>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BuildState {
|
impl Default for BuildState {
|
||||||
|
|
@ -53,21 +58,202 @@ impl Default for BuildState {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BuildState {
|
impl BuildState {
|
||||||
pub fn handle_event(&mut self, event: &Event) -> () {
|
pub fn count_job_runs(&self) -> usize {
|
||||||
|
self.job_runs.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
||||||
|
pub fn handle_event(&mut self, event: &Event) -> Result<(), DatabuildError> {
|
||||||
match event {
|
match event {
|
||||||
Event::WantCreateV1(e) => {
|
// JobRun events
|
||||||
self.wants.insert(e.want_id.clone(), e.clone().into());
|
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
|
||||||
}
|
Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e),
|
||||||
Event::WantCancelV1(e) => {
|
Event::JobRunFailureV1(e) => self.handle_job_run_failure(e),
|
||||||
if let Some(want) = self.wants.get_mut(&e.want_id) {
|
Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e),
|
||||||
want.status = Some(WantStatusCode::WantCanceled.into());
|
Event::JobRunSuccessV1(e) => self.handle_job_run_success(e),
|
||||||
want.last_updated_timestamp = current_timestamp();
|
Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e),
|
||||||
|
// Want events
|
||||||
|
Event::WantCreateV1(e) => self.handle_want_create(e),
|
||||||
|
Event::WantCancelV1(e) => self.handle_want_cancel(e),
|
||||||
|
// Taint events
|
||||||
|
Event::TaintCreateV1(e) => self.handle_taint_create(e),
|
||||||
|
Event::TaintDeleteV1(e) => self.handle_taint_delete(e),
|
||||||
|
// Ruh roh!
|
||||||
|
_ => panic!("Unhandled event type! {:?}", event),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<(), DatabuildError> {
|
||||||
|
self.wants
|
||||||
|
.insert(event.want_id.clone(), event.clone().into());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
|
||||||
|
if let Some(want) = self.wants.get_mut(&event.want_id) {
|
||||||
|
want.status = Some(WantStatusCode::WantCanceled.into());
|
||||||
|
want.last_updated_timestamp = current_timestamp();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<(), DatabuildError> {
|
||||||
|
// No job run should exist
|
||||||
|
if self.job_runs.get(&event.job_run_id).is_some() {
|
||||||
|
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
|
||||||
|
} else {
|
||||||
|
// Create job run to be inserted
|
||||||
|
let job_run: JobRunDetail = event.clone().into();
|
||||||
|
|
||||||
|
for pref in job_run.building_partitions.iter() {
|
||||||
|
// Update all wants that point to this partition ref to `Building`
|
||||||
|
// Query notes: "update all wants that point to this partition to building"
|
||||||
|
if let Some(want) = self.wants.get_mut(&pref.r#ref) {
|
||||||
|
want.status = Some(WantStatusCode::WantBuilding.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => (),
|
|
||||||
|
self.job_runs.insert(event.job_run_id.clone(), job_run.clone());
|
||||||
|
println!("Inserted job run: {:?}", job_run);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_job_run_status(
|
||||||
|
&mut self,
|
||||||
|
job_run_id: &str,
|
||||||
|
status: JobRunStatusCode,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
if let Some(job_run) = self.job_runs.get_mut(job_run_id) {
|
||||||
|
job_run.last_heartbeat_at = Some(current_timestamp());
|
||||||
|
job_run.status = Some(status.into());
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("Job run ID {} not found", job_run_id).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_partition_status(
|
||||||
|
&mut self,
|
||||||
|
pref: &PartitionRef,
|
||||||
|
status: PartitionStatusCode,
|
||||||
|
job_run_id: Option<&str>,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
|
||||||
|
partition.status = Some(status.clone().into());
|
||||||
|
partition.last_updated_timestamp = Some(current_timestamp());
|
||||||
|
if let Some(job_run_id) = job_run_id.map(str::to_string) {
|
||||||
|
if !partition.job_run_ids.contains(&job_run_id) {
|
||||||
|
partition.job_run_ids.push(job_run_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Partition doesn't exist yet, needs to be inserted
|
||||||
|
let want_ids = if let Some(jrid) = job_run_id {
|
||||||
|
let job_run = self.get_job_run(jrid).expect("Job run must exist for partition");
|
||||||
|
job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect()
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
};
|
||||||
|
|
||||||
|
let partition = PartitionDetail {
|
||||||
|
r#ref: Some(pref.clone()),
|
||||||
|
status: Some(status.into()),
|
||||||
|
last_updated_timestamp: Some(current_timestamp()),
|
||||||
|
job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]),
|
||||||
|
want_ids,
|
||||||
|
..PartitionDetail::default()
|
||||||
|
};
|
||||||
|
self.partitions.insert(pref.r#ref.clone(), partition);
|
||||||
|
};
|
||||||
|
|
||||||
|
self.update_wants_for_partition(&pref)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Walks the state from this want ID to update its status.
|
||||||
|
fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> {
|
||||||
|
if let Some(want) = self.wants.get(want_id) {
|
||||||
|
let details: Vec<Option<PartitionDetail>> = want
|
||||||
|
.upstreams
|
||||||
|
.iter()
|
||||||
|
.map(|pref| self.get_partition(&pref.r#ref))
|
||||||
|
.collect();
|
||||||
|
let status: WantStatusCode = details.into();
|
||||||
|
if let Some(mut_want) = self.wants.get_mut(want_id) {
|
||||||
|
mut_want.status = Some(status.into());
|
||||||
|
mut_want.last_updated_timestamp = current_timestamp();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(format!("Want id {} not found", want_id).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_heartbeat(
|
||||||
|
&mut self,
|
||||||
|
event: &JobRunHeartbeatEventV1,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_success(
|
||||||
|
&mut self,
|
||||||
|
event: &JobRunSuccessEventV1,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
println!("Job run success event: {:?}", event);
|
||||||
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
|
||||||
|
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||||
|
// Update partitions being build by this job
|
||||||
|
for pref in job_run.building_partitions {
|
||||||
|
self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
|
||||||
|
// todo!("Go to every want that references this partition and update its status")
|
||||||
|
let want_ids = self
|
||||||
|
.partitions
|
||||||
|
.get(&pref.r#ref)
|
||||||
|
.map(|p| p.want_ids.clone())
|
||||||
|
.ok_or(format!("Partition for ref {} not found", pref.r#ref))?;
|
||||||
|
for want_id in want_ids.iter() {
|
||||||
|
self.update_want_status(want_id)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_failure(
|
||||||
|
&mut self,
|
||||||
|
event: &JobRunFailureEventV1,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
|
||||||
|
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||||
|
for pref in job_run.building_partitions {
|
||||||
|
self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<(), DatabuildError> {
|
||||||
|
todo!("should update already inserted job run, partition status, want status")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_job_run_dep_miss(
|
||||||
|
&mut self,
|
||||||
|
event: &JobRunMissingDepsEventV1,
|
||||||
|
) -> Result<(), DatabuildError> {
|
||||||
|
todo!("should update already inserted job run, schedule wants...?")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<(), DatabuildError> {
|
||||||
|
todo!("...?")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> {
|
||||||
|
todo!("...?")
|
||||||
|
}
|
||||||
|
|
||||||
fn with_wants(self, wants: BTreeMap<String, WantDetail>) -> Self {
|
fn with_wants(self, wants: BTreeMap<String, WantDetail>) -> Self {
|
||||||
Self { wants, ..self }
|
Self { wants, ..self }
|
||||||
}
|
}
|
||||||
|
|
@ -186,6 +372,39 @@ impl BuildState {
|
||||||
.collect(),
|
.collect(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maps a dep miss into the BEL events it implies, so that the job can be run successfully later
|
||||||
|
pub fn dep_miss_to_events(
|
||||||
|
&self,
|
||||||
|
dep_miss: &DepMissJobRun<SubProcessBackend>,
|
||||||
|
) -> Result<Vec<Event>, DatabuildError> {
|
||||||
|
let mut events = vec![];
|
||||||
|
// Append literal job run dep miss
|
||||||
|
events.push(dep_miss.state.to_event(&dep_miss.id()));
|
||||||
|
// Append wants from dep miss
|
||||||
|
let job_run_detail = self
|
||||||
|
.get_job_run(&dep_miss.job_run_id.to_string())
|
||||||
|
.ok_or(format!(
|
||||||
|
"Unable to find job run with id `{}`",
|
||||||
|
dep_miss.job_run_id
|
||||||
|
))?;
|
||||||
|
// Infer data/SLA timestamps from upstream want
|
||||||
|
let want_timestamps: WantTimestamps = job_run_detail
|
||||||
|
.servicing_wants
|
||||||
|
.iter()
|
||||||
|
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
||||||
|
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
|
||||||
|
.ok_or(format!("No servicing wants found"))?;
|
||||||
|
// Create wants from dep misses
|
||||||
|
let want_events = missing_deps_to_want_events(
|
||||||
|
dep_miss.state.missing_deps.clone(),
|
||||||
|
&dep_miss.job_run_id,
|
||||||
|
want_timestamps,
|
||||||
|
);
|
||||||
|
events.extend(want_events);
|
||||||
|
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,8 @@ message WantAttributedPartitions {
|
||||||
message JobRunBufferEventV1 {
|
message JobRunBufferEventV1 {
|
||||||
string job_run_id = 1;
|
string job_run_id = 1;
|
||||||
string job_label = 2;
|
string job_label = 2;
|
||||||
repeated WantAttributedPartitions want_attributed_partitions = 3;
|
repeated PartitionRef building_partitions = 3;
|
||||||
|
repeated WantAttributedPartitions want_attributed_partitions = 4;
|
||||||
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
|
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
|
||||||
}
|
}
|
||||||
// Just indicates that job has entered queue
|
// Just indicates that job has entered queue
|
||||||
|
|
@ -189,6 +190,7 @@ message PartitionDetail {
|
||||||
optional uint64 last_updated_timestamp = 3;
|
optional uint64 last_updated_timestamp = 3;
|
||||||
// IDs that associate the partition with other objects
|
// IDs that associate the partition with other objects
|
||||||
repeated string job_run_ids = 4;
|
repeated string job_run_ids = 4;
|
||||||
|
// Wants that reference this partition
|
||||||
repeated string want_ids = 5;
|
repeated string want_ids = 5;
|
||||||
repeated string taint_ids = 6;
|
repeated string taint_ids = 6;
|
||||||
}
|
}
|
||||||
|
|
@ -225,7 +227,8 @@ message JobRunDetail {
|
||||||
string id = 1;
|
string id = 1;
|
||||||
JobRunStatus status = 2;
|
JobRunStatus status = 2;
|
||||||
optional uint64 last_heartbeat_at = 3;
|
optional uint64 last_heartbeat_at = 3;
|
||||||
repeated WantAttributedPartitions servicing_wants = 4;
|
repeated PartitionRef building_partitions = 4;
|
||||||
|
repeated WantAttributedPartitions servicing_wants = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,7 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::util::current_timestamp;
|
use crate::util::current_timestamp;
|
||||||
use crate::{
|
use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
||||||
event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent,
|
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
|
||||||
ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1,
|
|
||||||
WantCreateEventV1, WantDetail, WantStatus, WantStatusCode,
|
|
||||||
};
|
|
||||||
|
|
||||||
impl From<&WantCreateEventV1> for WantDetail {
|
impl From<&WantCreateEventV1> for WantDetail {
|
||||||
fn from(e: &WantCreateEventV1) -> Self {
|
fn from(e: &WantCreateEventV1) -> Self {
|
||||||
|
|
@ -38,6 +35,15 @@ impl From<WantCancelEventV1> for Event {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<WantCreateEventV1> for WantAttributedPartitions {
|
||||||
|
fn from(value: WantCreateEventV1) -> Self {
|
||||||
|
Self {
|
||||||
|
want_id: value.want_id,
|
||||||
|
partitions: value.partitions,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<WantStatusCode> for WantStatus {
|
impl From<WantStatusCode> for WantStatus {
|
||||||
fn from(code: WantStatusCode) -> Self {
|
fn from(code: WantStatusCode) -> Self {
|
||||||
WantStatus {
|
WantStatus {
|
||||||
|
|
@ -47,6 +53,50 @@ impl From<WantStatusCode> for WantStatus {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<JobRunBufferEventV1> for JobRunDetail {
|
||||||
|
fn from(value: JobRunBufferEventV1) -> Self {
|
||||||
|
Self {
|
||||||
|
id: value.job_run_id,
|
||||||
|
status: Some(JobRunStatusCode::JobRunQueued.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: value.building_partitions,
|
||||||
|
servicing_wants: value.want_attributed_partitions,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn want_status_matches_any(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
|
||||||
|
pds.iter()
|
||||||
|
.any(|pd| pd.clone()
|
||||||
|
.map(|pd| pd.status == Some(status.into()))
|
||||||
|
.unwrap_or(false))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn want_status_matches_all(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
|
||||||
|
pds.iter()
|
||||||
|
.all(|pd| pd.clone()
|
||||||
|
.map(|pd| pd.status == Some(status.into()))
|
||||||
|
.unwrap_or(false))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Merges a list of partition details into a single status code.
|
||||||
|
/// Takes the lowest state as the want status.
|
||||||
|
impl Into<WantStatusCode> for Vec<Option<PartitionDetail>> {
|
||||||
|
|
||||||
|
fn into(self) -> WantStatusCode {
|
||||||
|
if want_status_matches_any(&self, PartitionFailed) {
|
||||||
|
WantStatusCode::WantFailed
|
||||||
|
} else if want_status_matches_all(&self, PartitionLive) {
|
||||||
|
WantStatusCode::WantSuccessful
|
||||||
|
} else if self.iter().any(|pd| pd.is_none()) {
|
||||||
|
WantStatusCode::WantBuilding
|
||||||
|
} else {
|
||||||
|
WantStatusCode::WantIdle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<&str> for PartitionRef {
|
impl From<&str> for PartitionRef {
|
||||||
fn from(value: &str) -> Self {
|
fn from(value: &str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
@ -88,3 +138,12 @@ impl From<JobTriggeredEvent> for EventSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<&WantDetail> for WantAttributedPartitions {
|
||||||
|
fn from(value: &WantDetail) -> Self {
|
||||||
|
Self {
|
||||||
|
want_id: value.want_id.clone(),
|
||||||
|
partitions: value.partitions.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@ use crate::{
|
||||||
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
|
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
|
||||||
JobRunSuccessEventV1, MissingDeps, ReadDeps,
|
JobRunSuccessEventV1, MissingDeps, ReadDeps,
|
||||||
};
|
};
|
||||||
|
use crate::util::DatabuildError;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
|
|
@ -34,21 +35,21 @@ pub trait JobRunBackend: Sized {
|
||||||
fn start(
|
fn start(
|
||||||
not_started: Self::NotStartedState,
|
not_started: Self::NotStartedState,
|
||||||
env: Option<HashMap<String, String>>,
|
env: Option<HashMap<String, String>>,
|
||||||
) -> Result<Self::RunningState, Box<dyn Error>>;
|
) -> Result<Self::RunningState, DatabuildError>;
|
||||||
|
|
||||||
/// Poll a running job for state changes
|
/// Poll a running job for state changes
|
||||||
fn poll(
|
fn poll(
|
||||||
running: &mut Self::RunningState,
|
running: &mut Self::RunningState,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
|
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
|
||||||
Box<dyn Error>,
|
DatabuildError,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
/// Cancel a running job
|
/// Cancel a running job
|
||||||
fn cancel_job(
|
fn cancel_job(
|
||||||
running: Self::RunningState,
|
running: Self::RunningState,
|
||||||
source: EventSource,
|
source: EventSource,
|
||||||
) -> Result<Self::CanceledState, Box<dyn Error>>;
|
) -> Result<Self::CanceledState, DatabuildError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of polling a running job
|
/// Result of polling a running job
|
||||||
|
|
@ -91,14 +92,14 @@ impl<B: JobRunBackend> NotStartedJobRun<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(self) -> Result<RunningJobRun<B>, Box<dyn Error>> {
|
pub fn run(self) -> Result<RunningJobRun<B>, DatabuildError> {
|
||||||
self.run_with_env(None)
|
self.run_with_env(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_with_env(
|
pub fn run_with_env(
|
||||||
self,
|
self,
|
||||||
env: Option<HashMap<String, String>>,
|
env: Option<HashMap<String, String>>,
|
||||||
) -> Result<RunningJobRun<B>, Box<dyn Error>> {
|
) -> Result<RunningJobRun<B>, DatabuildError> {
|
||||||
let running_state = B::start(self.state, env)?;
|
let running_state = B::start(self.state, env)?;
|
||||||
Ok(JobRun {
|
Ok(JobRun {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
|
|
@ -110,7 +111,7 @@ impl<B: JobRunBackend> NotStartedJobRun<B> {
|
||||||
|
|
||||||
// Methods available only on Running state
|
// Methods available only on Running state
|
||||||
impl<B: JobRunBackend> RunningJobRun<B> {
|
impl<B: JobRunBackend> RunningJobRun<B> {
|
||||||
pub fn visit(&mut self) -> Result<JobRunVisitResult<B>, Box<dyn Error>> {
|
pub fn visit(&mut self) -> Result<JobRunVisitResult<B>, DatabuildError> {
|
||||||
match B::poll(&mut self.state)? {
|
match B::poll(&mut self.state)? {
|
||||||
PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning),
|
PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning),
|
||||||
PollResult::Completed(completed_state) => {
|
PollResult::Completed(completed_state) => {
|
||||||
|
|
@ -140,7 +141,7 @@ impl<B: JobRunBackend> RunningJobRun<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cancel(self, source: EventSource) -> Result<CanceledJobRun<B>, Box<dyn Error>> {
|
pub fn cancel(self, source: EventSource) -> Result<CanceledJobRun<B>, DatabuildError> {
|
||||||
let canceled_state = B::cancel_job(self.state, source)?;
|
let canceled_state = B::cancel_job(self.state, source)?;
|
||||||
Ok(JobRun {
|
Ok(JobRun {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
|
|
@ -223,7 +224,7 @@ impl JobRunBackend for SubProcessBackend {
|
||||||
fn start(
|
fn start(
|
||||||
not_started: Self::NotStartedState,
|
not_started: Self::NotStartedState,
|
||||||
env: Option<HashMap<String, String>>,
|
env: Option<HashMap<String, String>>,
|
||||||
) -> Result<Self::RunningState, Box<dyn Error>> {
|
) -> Result<Self::RunningState, DatabuildError> {
|
||||||
let process = Command::new(not_started.entry_point)
|
let process = Command::new(not_started.entry_point)
|
||||||
.args(not_started.args)
|
.args(not_started.args)
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
|
|
@ -241,7 +242,7 @@ impl JobRunBackend for SubProcessBackend {
|
||||||
running: &mut Self::RunningState,
|
running: &mut Self::RunningState,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
|
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
|
||||||
Box<dyn Error>,
|
DatabuildError,
|
||||||
> {
|
> {
|
||||||
// Non-blocking check for exit status
|
// Non-blocking check for exit status
|
||||||
if let Some(exit_status) = running.process.try_wait()? {
|
if let Some(exit_status) = running.process.try_wait()? {
|
||||||
|
|
@ -309,7 +310,7 @@ impl JobRunBackend for SubProcessBackend {
|
||||||
fn cancel_job(
|
fn cancel_job(
|
||||||
mut running: Self::RunningState,
|
mut running: Self::RunningState,
|
||||||
source: EventSource,
|
source: EventSource,
|
||||||
) -> Result<Self::CanceledState, Box<dyn Error>> {
|
) -> Result<Self::CanceledState, DatabuildError> {
|
||||||
// Kill the process
|
// Kill the process
|
||||||
running.process.kill()?;
|
running.process.kill()?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,30 +1,35 @@
|
||||||
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
||||||
use crate::build_state::BuildState;
|
use crate::build_state::BuildState;
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
|
|
||||||
use crate::job::JobConfiguration;
|
use crate::job::JobConfiguration;
|
||||||
use crate::job_run::{
|
use crate::job_run::{
|
||||||
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
|
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
|
||||||
SubProcessBackend,
|
SubProcessBackend,
|
||||||
};
|
};
|
||||||
use crate::{PartitionRef, WantDetail};
|
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use crate::util::DatabuildError;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||||
the visitor pattern to monitor job exec progress and liveness, and adds
|
the visitor pattern to monitor job exec progress and liveness.
|
||||||
*/
|
|
||||||
|
|
||||||
|
JTBDs:
|
||||||
|
- Orchestrator turns job run dep miss failures into derivative wants for the missed partitions
|
||||||
|
- Orchestrator turns schedulable wants into job runs to build the requested partitions
|
||||||
|
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
|
||||||
|
jobs when possible
|
||||||
|
*/
|
||||||
struct Orchestrator<S: BELStorage + Debug> {
|
struct Orchestrator<S: BELStorage + Debug> {
|
||||||
bel: BuildEventLog<S>,
|
pub bel: BuildEventLog<S>,
|
||||||
not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
|
pub not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
|
||||||
running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
|
pub running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
|
||||||
completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
|
pub completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
|
||||||
failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
|
pub failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
|
||||||
dep_miss_jobs: Vec<DepMissJobRun<SubProcessBackend>>,
|
pub dep_miss_jobs: Vec<DepMissJobRun<SubProcessBackend>>,
|
||||||
config: OrchestratorConfig,
|
pub config: OrchestratorConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Orchestrator<MemoryBELStorage> {
|
impl Default for Orchestrator<MemoryBELStorage> {
|
||||||
|
|
@ -134,10 +139,22 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Continuously invoked function to watch job run status */
|
fn job_runs_count(&self) -> usize {
|
||||||
fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> {
|
self.not_started_jobs.len()
|
||||||
|
+ self.running_jobs.len()
|
||||||
|
+ self.completed_jobs.len()
|
||||||
|
+ self.failed_jobs.len()
|
||||||
|
+ self.dep_miss_jobs.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Visits individual job runs, appending resulting events, and moving runs between run status
|
||||||
|
/// containers.
|
||||||
|
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
|
||||||
use crate::job_run::JobRunVisitResult;
|
use crate::job_run::JobRunVisitResult;
|
||||||
|
|
||||||
|
// Coherence check setup
|
||||||
|
let total_runs_count = self.job_runs_count();
|
||||||
|
|
||||||
// First, start any not-started jobs
|
// First, start any not-started jobs
|
||||||
while let Some(job) = self.not_started_jobs.pop() {
|
while let Some(job) = self.not_started_jobs.pop() {
|
||||||
let running = job.run()?;
|
let running = job.run()?;
|
||||||
|
|
@ -146,25 +163,35 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
|
|
||||||
// Visit running jobs and transition them to terminal states
|
// Visit running jobs and transition them to terminal states
|
||||||
let mut still_running = Vec::new();
|
let mut still_running = Vec::new();
|
||||||
|
// TODO make sure that failure in the middle can't mess up build state - likely need to
|
||||||
|
// refactor here (e.g. turn state changes into data, commit them after all have been
|
||||||
|
// calculated and validated)
|
||||||
for mut job in self.running_jobs.drain(..) {
|
for mut job in self.running_jobs.drain(..) {
|
||||||
match job.visit()? {
|
match job.visit()? {
|
||||||
JobRunVisitResult::StillRunning => {
|
JobRunVisitResult::StillRunning => {
|
||||||
|
println!("Still running job: {:?}", job.id());
|
||||||
still_running.push(job);
|
still_running.push(job);
|
||||||
}
|
}
|
||||||
JobRunVisitResult::Completed(completed) => {
|
JobRunVisitResult::Completed(completed) => {
|
||||||
// Emit success event
|
// Emit success event
|
||||||
let event: Event = completed.state.to_event(&completed.id());
|
println!("Completed job: {:?}", completed.id());
|
||||||
self.bel.append_event(&event)?;
|
let result = run_complete_to_events(&self.bel.state, &completed)?;
|
||||||
|
for event in result.events {
|
||||||
|
self.bel.append_event(&event)?;
|
||||||
|
}
|
||||||
|
// Move job to completed
|
||||||
self.completed_jobs.push(completed);
|
self.completed_jobs.push(completed);
|
||||||
}
|
}
|
||||||
JobRunVisitResult::Failed(failed) => {
|
JobRunVisitResult::Failed(failed) => {
|
||||||
// Emit failure event
|
// Emit failure event
|
||||||
|
println!("Failed job: {:?}", failed.id());
|
||||||
let event: Event = failed.state.to_event(&failed.id());
|
let event: Event = failed.state.to_event(&failed.id());
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
self.failed_jobs.push(failed);
|
self.failed_jobs.push(failed);
|
||||||
}
|
}
|
||||||
JobRunVisitResult::DepMiss(dep_miss) => {
|
JobRunVisitResult::DepMiss(dep_miss) => {
|
||||||
for event in dep_miss_to_events(&self.bel.state, &dep_miss)? {
|
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
||||||
|
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
}
|
}
|
||||||
// Record missing upstream status in want details
|
// Record missing upstream status in want details
|
||||||
|
|
@ -174,11 +201,18 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
self.running_jobs = still_running;
|
self.running_jobs = still_running;
|
||||||
|
|
||||||
|
// Panic because this should never happen
|
||||||
|
assert_eq!(
|
||||||
|
self.job_runs_count(),
|
||||||
|
total_runs_count,
|
||||||
|
"Detected job run count change during job run visit (should never happen)"
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Continuously invoked function to watch wants and schedule new jobs */
|
/** Continuously invoked function to watch wants and schedule new jobs */
|
||||||
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
|
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
|
||||||
// Collect unhandled wants, group by job that handles each partition,
|
// Collect unhandled wants, group by job that handles each partition,
|
||||||
let schedulability = self.bel.state.schedulable_wants();
|
let schedulability = self.bel.state.schedulable_wants();
|
||||||
println!("schedulability: {:?}", schedulability);
|
println!("schedulability: {:?}", schedulability);
|
||||||
|
|
@ -202,8 +236,17 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
)
|
)
|
||||||
.into())
|
.into())
|
||||||
} else {
|
} else {
|
||||||
|
// Spawn jobs and add events
|
||||||
for wg in grouped_wants.want_groups {
|
for wg in grouped_wants.want_groups {
|
||||||
self.not_started_jobs.push(wg.spawn()?);
|
let job_run = wg.spawn()?;
|
||||||
|
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||||
|
job_run_id: job_run.job_run_id.into(),
|
||||||
|
job_label: wg.job.label,
|
||||||
|
building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(),
|
||||||
|
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
|
||||||
|
});
|
||||||
|
self.bel.append_event(&job_buffer_event)?;
|
||||||
|
self.not_started_jobs.push(job_run);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -236,50 +279,48 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn step(&mut self) -> Result<(), Box<dyn Error>> {
|
fn step(&mut self) -> Result<(), DatabuildError> {
|
||||||
self.poll_job_runs()?;
|
self.poll_job_runs()?;
|
||||||
self.poll_wants()?;
|
self.poll_wants()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Entrypoint for running jobs */
|
/** Entrypoint for running jobs */
|
||||||
pub fn join(&mut self) -> Result<(), Box<dyn Error>> {
|
pub fn join(&mut self) -> Result<(), DatabuildError> {
|
||||||
loop {
|
loop {
|
||||||
self.step()?
|
self.step()?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dep_miss_to_events(
|
#[derive(Default, Clone, Debug)]
|
||||||
bel_state: &BuildState,
|
pub struct JobRunCompleteResult {
|
||||||
dep_miss: &DepMissJobRun<SubProcessBackend>,
|
/// Events to append to the BEL from this job completing
|
||||||
) -> Result<Vec<Event>, Box<dyn Error>> {
|
pub events: Vec<Event>,
|
||||||
let mut events = vec![];
|
}
|
||||||
// Append literal job run dep miss
|
|
||||||
events.push(dep_miss.state.to_event(&dep_miss.id()));
|
|
||||||
// Append wants from dep miss
|
|
||||||
let job_run_detail = bel_state
|
|
||||||
.get_job_run(&dep_miss.job_run_id.to_string())
|
|
||||||
.ok_or(format!(
|
|
||||||
"Unable to find job run with id `{}`",
|
|
||||||
dep_miss.job_run_id
|
|
||||||
))?;
|
|
||||||
// Infer data/SLA timestamps from upstream want
|
|
||||||
let want_timestamps: WantTimestamps = job_run_detail
|
|
||||||
.servicing_wants
|
|
||||||
.iter()
|
|
||||||
.flat_map(|wap| bel_state.get_want(&wap.want_id).map(|w| w.into()))
|
|
||||||
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
|
|
||||||
.ok_or(format!("No servicing wants found"))?;
|
|
||||||
// Create wants from dep misses
|
|
||||||
let want_events = missing_deps_to_want_events(
|
|
||||||
dep_miss.state.missing_deps.clone(),
|
|
||||||
&dep_miss.job_run_id,
|
|
||||||
want_timestamps,
|
|
||||||
);
|
|
||||||
events.extend(want_events);
|
|
||||||
|
|
||||||
Ok(events)
|
/// Handle successful run completion:
|
||||||
|
/// - Adding run success event
|
||||||
|
/// - Updating status for partitions actually built by the job
|
||||||
|
fn run_complete_to_events(
|
||||||
|
bel_state: &BuildState,
|
||||||
|
completed: &CompletedJobRun<SubProcessBackend>,
|
||||||
|
) -> Result<JobRunCompleteResult, DatabuildError> {
|
||||||
|
let mut events = vec![
|
||||||
|
// Event marking completion of job
|
||||||
|
completed.state.to_event(&completed.id()),
|
||||||
|
];
|
||||||
|
// let job_detail = bel_state
|
||||||
|
// .get_job_run(&completed.job_run_id.to_string())
|
||||||
|
// .ok_or(format!(
|
||||||
|
// "No job run found for id `{}`",
|
||||||
|
// completed.job_run_id
|
||||||
|
// ))?;
|
||||||
|
|
||||||
|
Ok(JobRunCompleteResult {
|
||||||
|
// built_partitions: job_detail.building_partitions,
|
||||||
|
events,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -405,6 +446,7 @@ mod tests {
|
||||||
orchestrator.bel.append_event(&e).expect("append");
|
orchestrator.bel.append_event(&e).expect("append");
|
||||||
}
|
}
|
||||||
assert_eq!(orchestrator.not_started_jobs.len(), 0);
|
assert_eq!(orchestrator.not_started_jobs.len(), 0);
|
||||||
|
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
|
||||||
|
|
||||||
// When
|
// When
|
||||||
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
|
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
|
||||||
|
|
@ -425,7 +467,8 @@ mod tests {
|
||||||
.args,
|
.args,
|
||||||
vec!["data/alpha"],
|
vec!["data/alpha"],
|
||||||
"should have scheduled alpha job"
|
"should have scheduled alpha job"
|
||||||
)
|
);
|
||||||
|
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use case: A schedulable want that can't be matched to a job should return an error
|
// Use case: A schedulable want that can't be matched to a job should return an error
|
||||||
|
|
@ -453,9 +496,10 @@ mod tests {
|
||||||
mod want_create {
|
mod want_create {
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
|
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
|
||||||
use crate::orchestrator::dep_miss_to_events;
|
|
||||||
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
|
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
|
||||||
use crate::{JobRunDetail, MissingDeps, WantAttributedPartitions, WantCreateEventV1};
|
use crate::{
|
||||||
|
JobRunBufferEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1,
|
||||||
|
};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
@ -465,33 +509,37 @@ mod tests {
|
||||||
// Given a
|
// Given a
|
||||||
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
|
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
|
||||||
// Add event for originating want
|
// Add event for originating want
|
||||||
|
let want_create = WantCreateEventV1::sample();
|
||||||
|
let building_partitions = vec!["data/beta".into()];
|
||||||
orchestrator
|
orchestrator
|
||||||
.bel
|
.bel
|
||||||
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
||||||
partitions: vec!["data/beta".into()],
|
partitions: building_partitions.clone(),
|
||||||
..WantCreateEventV1::sample()
|
..want_create.clone()
|
||||||
}))
|
}))
|
||||||
.expect("event append");
|
.expect("event append");
|
||||||
// Create failed job run detail
|
// Create failed job run detail
|
||||||
|
let want_attributed_partitions: Vec<WantAttributedPartitions> =
|
||||||
|
vec![want_create.clone().into()];
|
||||||
let job_run_id = Uuid::new_v4();
|
let job_run_id = Uuid::new_v4();
|
||||||
let job_run = JobRunDetail {
|
let job_run = JobRunBufferEventV1 {
|
||||||
servicing_wants: orchestrator
|
job_run_id: job_run_id.into(),
|
||||||
.bel
|
building_partitions: building_partitions.clone(),
|
||||||
.state
|
want_attributed_partitions: want_attributed_partitions.clone(),
|
||||||
.wants
|
..JobRunBufferEventV1::default()
|
||||||
.values()
|
|
||||||
.map(|w| WantAttributedPartitions {
|
|
||||||
want_id: w.want_id.clone(),
|
|
||||||
partitions: w.partitions.clone(),
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
..JobRunDetail::default()
|
|
||||||
};
|
};
|
||||||
|
orchestrator
|
||||||
|
.bel
|
||||||
|
.append_event(&Event::JobRunBufferV1(job_run))
|
||||||
|
.expect("event append");
|
||||||
|
|
||||||
|
// Job runs should not be empty
|
||||||
orchestrator
|
orchestrator
|
||||||
.bel
|
.bel
|
||||||
.state
|
.state
|
||||||
.job_runs
|
.get_job_run(&job_run_id.to_string())
|
||||||
.insert(job_run_id.into(), job_run);
|
.expect("job run should exist");
|
||||||
|
|
||||||
// Add event for job failure
|
// Add event for job failure
|
||||||
let dep_miss_job_run = DepMissJobRun {
|
let dep_miss_job_run = DepMissJobRun {
|
||||||
job_run_id,
|
job_run_id,
|
||||||
|
|
@ -507,7 +555,12 @@ mod tests {
|
||||||
};
|
};
|
||||||
|
|
||||||
// When calculating events from dep miss
|
// When calculating events from dep miss
|
||||||
let events = dep_miss_to_events(&orchestrator.bel.state, &dep_miss_job_run).unwrap();
|
// TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
|
||||||
|
let events = orchestrator
|
||||||
|
.bel
|
||||||
|
.state
|
||||||
|
.dep_miss_to_events(&dep_miss_job_run)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Should have scheduled a job for alpha
|
// Should have scheduled a job for alpha
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
@ -537,12 +590,65 @@ mod tests {
|
||||||
|
|
||||||
/// Orchestrator needs to be able to achieve high level orchestration use cases.
|
/// Orchestrator needs to be able to achieve high level orchestration use cases.
|
||||||
mod orchestration {
|
mod orchestration {
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
|
||||||
|
use crate::{PartitionStatusCode, WantCreateEventV1};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
/// Use case: should run a job to produce a partition in reaction to a want, then have the
|
/// Use case: should run a job to produce a partition in reaction to a want, then have the
|
||||||
/// want fulfilled.
|
/// want fulfilled.
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
|
||||||
fn test_want_builds_partition() {
|
fn test_want_builds_partition() {
|
||||||
todo!()
|
// Given
|
||||||
|
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
|
||||||
|
// Add event for originating want
|
||||||
|
let partition = "data/alpha";
|
||||||
|
orchestrator
|
||||||
|
.bel
|
||||||
|
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
|
||||||
|
partitions: vec![partition.into()],
|
||||||
|
..WantCreateEventV1::sample()
|
||||||
|
}))
|
||||||
|
.expect("event append");
|
||||||
|
|
||||||
|
// When
|
||||||
|
// Poll wants then schedule pending jobs
|
||||||
|
orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed");
|
||||||
|
assert_eq!(orchestrator.not_started_jobs.len(), 1);
|
||||||
|
// poll job runs should start job run
|
||||||
|
orchestrator.poll_job_runs().expect("should start run");
|
||||||
|
assert_eq!(orchestrator.running_jobs.len(), 1);
|
||||||
|
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
|
||||||
|
thread::sleep(Duration::from_millis(1));
|
||||||
|
// Should still be running after 1ms
|
||||||
|
orchestrator.poll_job_runs().expect("should still be running");
|
||||||
|
assert_eq!(orchestrator.running_jobs.len(), 1);
|
||||||
|
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
|
||||||
|
println!("STATE: {:?}", orchestrator.bel.state);
|
||||||
|
|
||||||
|
// Wait for it to complete
|
||||||
|
thread::sleep(Duration::from_millis(10));
|
||||||
|
orchestrator.poll_job_runs().expect("should be able to poll existing job run");
|
||||||
|
|
||||||
|
// Job run should have succeeded
|
||||||
|
assert!(orchestrator.not_started_jobs.is_empty());
|
||||||
|
assert!(orchestrator.failed_jobs.is_empty());
|
||||||
|
assert!(orchestrator.dep_miss_jobs.is_empty());
|
||||||
|
assert!(orchestrator.running_jobs.is_empty());
|
||||||
|
assert_eq!(orchestrator.completed_jobs.len(), 1);
|
||||||
|
|
||||||
|
// Build state should show partition as live
|
||||||
|
assert_eq!(
|
||||||
|
orchestrator
|
||||||
|
.bel
|
||||||
|
.state
|
||||||
|
.get_partition(partition)
|
||||||
|
.unwrap()
|
||||||
|
.status,
|
||||||
|
Some(PartitionStatusCode::PartitionLive.into()),
|
||||||
|
"partition should be live after job run completion"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use case: a graph with multi-hop deps should achieve the multi-hop build
|
// Use case: a graph with multi-hop deps should achieve the multi-hop build
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,75 @@
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use std::backtrace::Backtrace;
|
||||||
|
|
||||||
pub fn current_timestamp() -> u64 {
|
pub fn current_timestamp() -> u64 {
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||||
duration_since_epoch.as_nanos() as u64
|
duration_since_epoch.as_nanos() as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn maybe_backtrace() -> Backtrace {
|
||||||
|
if std::env::var("RUST_BACKTRACE").is_ok() {
|
||||||
|
Backtrace::force_capture()
|
||||||
|
} else {
|
||||||
|
Backtrace::disabled()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DatabuildError {
|
||||||
|
msg: String,
|
||||||
|
source: Option<Box<dyn std::error::Error + Send + Sync>>,
|
||||||
|
backtrace: Backtrace,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatabuildError {
|
||||||
|
fn new(msg: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: msg.into(),
|
||||||
|
source: None,
|
||||||
|
backtrace: maybe_backtrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Error> for DatabuildError {
|
||||||
|
fn from(err: std::io::Error) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: err.to_string(),
|
||||||
|
source: Some(Box::new(err)),
|
||||||
|
backtrace: maybe_backtrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<rusqlite::Error> for DatabuildError {
|
||||||
|
fn from(err: rusqlite::Error) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: err.to_string(),
|
||||||
|
source: Some(Box::new(err)),
|
||||||
|
backtrace: maybe_backtrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<prost::EncodeError> for DatabuildError {
|
||||||
|
fn from(err: prost::EncodeError) -> Self {
|
||||||
|
Self {
|
||||||
|
msg: err.to_string(),
|
||||||
|
source: Some(Box::new(err)),
|
||||||
|
backtrace: maybe_backtrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<String> for DatabuildError {
|
||||||
|
fn from(value: String) -> Self {
|
||||||
|
Self::new(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for DatabuildError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
2
docs/ideas/metadata.md
Normal file
2
docs/ideas/metadata.md
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level.
|
||||||
16
docs/ideas/querying.md
Normal file
16
docs/ideas/querying.md
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
|
||||||
|
Querying seems to be a fundamental factor of the problem. For instance:
|
||||||
|
|
||||||
|
- Upon canceling a want, canceling all wants it spawned, and the jobs attached to them.
|
||||||
|
- Answering the question, "what in-progress job runs were spawned by this want?"
|
||||||
|
- Answering, "why was this partition built?"
|
||||||
|
- Answering, "what partitions needed to be built and jobs run to fulfill this want?"
|
||||||
|
- Answering, "what jobs produce the partitions missed by this job run?"
|
||||||
|
|
||||||
|
Let's start prefixing functions that should probably be mostly queries with `query_`.
|
||||||
|
|
||||||
|
|
||||||
|
Notes on JTBDs and queries:
|
||||||
|
|
||||||
|
- When a want is schedulable (query), map the requested partitions to the job runs that create them (query), and start them
|
||||||
|
-
|
||||||
|
|
@ -129,40 +129,6 @@ def parse_crate_specs(module_content):
|
||||||
crates[package] = crate_info
|
crates[package] = crate_info
|
||||||
|
|
||||||
return crates
|
return crates
|
||||||
"""Extract crate specifications from MODULE.bazel content."""
|
|
||||||
crates = {}
|
|
||||||
|
|
||||||
# Find all crate.spec() calls
|
|
||||||
spec_pattern = r'crate\.spec\(\s*(.*?)\s*\)'
|
|
||||||
specs = re.findall(spec_pattern, module_content, re.DOTALL)
|
|
||||||
|
|
||||||
for spec in specs:
|
|
||||||
# Parse the spec parameters
|
|
||||||
package_match = re.search(r'package\s*=\s*"([^"]+)"', spec)
|
|
||||||
version_match = re.search(r'version\s*=\s*"([^"]+)"', spec)
|
|
||||||
features_match = re.search(r'features\s*=\s*\[(.*?)\]', spec, re.DOTALL)
|
|
||||||
default_features_match = re.search(r'default_features\s*=\s*False', spec)
|
|
||||||
|
|
||||||
if package_match and version_match:
|
|
||||||
package = package_match.group(1)
|
|
||||||
version = version_match.group(1)
|
|
||||||
|
|
||||||
crate_info = {"version": version}
|
|
||||||
|
|
||||||
# Handle features
|
|
||||||
if features_match:
|
|
||||||
features_str = features_match.group(1)
|
|
||||||
features = [f.strip().strip('"') for f in features_str.split(',') if f.strip()]
|
|
||||||
if features:
|
|
||||||
crate_info["features"] = features
|
|
||||||
|
|
||||||
# Handle default-features = false
|
|
||||||
if default_features_match:
|
|
||||||
crate_info["default-features"] = False
|
|
||||||
|
|
||||||
crates[package] = crate_info
|
|
||||||
|
|
||||||
return crates
|
|
||||||
|
|
||||||
def generate_cargo_toml(crates, structure, project_name="databuild"):
|
def generate_cargo_toml(crates, structure, project_name="databuild"):
|
||||||
"""Generate Cargo.toml content from parsed crates and project structure."""
|
"""Generate Cargo.toml content from parsed crates and project structure."""
|
||||||
|
|
@ -170,7 +136,7 @@ def generate_cargo_toml(crates, structure, project_name="databuild"):
|
||||||
f'[package]',
|
f'[package]',
|
||||||
f'name = "{project_name}"',
|
f'name = "{project_name}"',
|
||||||
f'version = "0.1.0"',
|
f'version = "0.1.0"',
|
||||||
f'edition = "2021"',
|
f'edition = "2024"',
|
||||||
f'',
|
f'',
|
||||||
f'# Generated from MODULE.bazel for IDE support only',
|
f'# Generated from MODULE.bazel for IDE support only',
|
||||||
f'# Actual dependencies are managed by Bazel',
|
f'# Actual dependencies are managed by Bazel',
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue