Compare commits
4 commits
2cf778a07b
...
eadd23eb63
| Author | SHA1 | Date | |
|---|---|---|---|
| eadd23eb63 | |||
| d42bddac90 | |||
| 66ba40e2db | |||
| 9bdd435089 |
10 changed files with 365 additions and 182 deletions
|
|
@ -318,43 +318,6 @@
|
|||
]
|
||||
}
|
||||
},
|
||||
"@@aspect_rules_ts+//ts:extensions.bzl%ext": {
|
||||
"general": {
|
||||
"bzlTransitiveDigest": "aVqwKoRPrSXO367SJABlye04kmpR/9VM2xiXB3nh3Ls=",
|
||||
"usagesDigest": "aJyEQO7QJEiaQqFH9CN9MlsG06TiRjGBpQl5IfRHTe0=",
|
||||
"recordedFileInputs": {
|
||||
"@@//databuild/dashboard/package.json": "6bca4800c4e27564303a32c8565d96fc21e572756c07b8a21566bd1195f757b2"
|
||||
},
|
||||
"recordedDirentsInputs": {},
|
||||
"envVariables": {},
|
||||
"generatedRepoSpecs": {
|
||||
"npm_typescript": {
|
||||
"repoRuleId": "@@aspect_rules_ts+//ts/private:npm_repositories.bzl%http_archive_version",
|
||||
"attributes": {
|
||||
"bzlmod": true,
|
||||
"version": "",
|
||||
"version_from": "@@//databuild/dashboard:package.json",
|
||||
"integrity": "",
|
||||
"build_file": "@@aspect_rules_ts+//ts:BUILD.typescript",
|
||||
"build_file_substitutions": {
|
||||
"bazel_worker_version": "5.4.2",
|
||||
"google_protobuf_version": "3.20.1"
|
||||
},
|
||||
"urls": [
|
||||
"https://registry.npmjs.org/typescript/-/typescript-{}.tgz"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"recordedRepoMappingEntries": [
|
||||
[
|
||||
"aspect_rules_ts+",
|
||||
"bazel_tools",
|
||||
"bazel_tools"
|
||||
]
|
||||
]
|
||||
}
|
||||
},
|
||||
"@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": {
|
||||
"general": {
|
||||
"bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::data_build_event::Event;
|
||||
use crate::{DataBuildEvent, WantDetail};
|
||||
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail};
|
||||
use prost::Message;
|
||||
use rusqlite::Connection;
|
||||
use std::error::Error;
|
||||
|
|
@ -169,6 +169,57 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
|||
let idx = self.storage.append_event(event)?;
|
||||
Ok(idx)
|
||||
}
|
||||
|
||||
|
||||
// API methods
|
||||
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
|
||||
self.state.list_wants(&req)
|
||||
}
|
||||
|
||||
pub fn api_handle_list_taints(&self, req: ListTaintsRequest) -> ListTaintsResponse {
|
||||
self.state.list_taints(&req)
|
||||
}
|
||||
|
||||
pub fn api_handle_list_partitions(&self, req: ListPartitionsRequest) -> ListPartitionsResponse {
|
||||
self.state.list_partitions(&req)
|
||||
}
|
||||
|
||||
pub fn api_handle_list_job_runs(&self, req: ListJobRunsRequest) -> ListJobRunsResponse {
|
||||
self.state.list_job_runs(&req)
|
||||
}
|
||||
|
||||
pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result<CreateWantResponse, DatabuildError> {
|
||||
let ev: WantCreateEventV1 = req.into();
|
||||
self.append_event(&ev.clone().into())?;
|
||||
Ok(self.state.get_want(&ev.want_id).into())
|
||||
}
|
||||
|
||||
pub fn api_handle_want_get(&self, req: GetWantRequest) -> GetWantResponse {
|
||||
self.state.get_want(&req.want_id).into()
|
||||
}
|
||||
|
||||
pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result<CancelWantResponse, DatabuildError> {
|
||||
let ev: WantCancelEventV1 = req.into();
|
||||
self.append_event(&ev.clone().into())?;
|
||||
Ok(self.state.get_want(&ev.want_id).into())
|
||||
}
|
||||
|
||||
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
|
||||
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
|
||||
todo!();
|
||||
let ev: TaintCreateEventV1 = req.into();
|
||||
self.append_event(&ev.clone().into())?;
|
||||
Ok(self.state.get_taint(&ev.taint_id).into())
|
||||
}
|
||||
|
||||
pub fn api_handle_taint_get(&self, req: GetTaintRequest) -> GetTaintResponse {
|
||||
todo!()
|
||||
}
|
||||
|
||||
// Not implemented yet
|
||||
// pub fn api_handle_taint_cancel(&mut self, req: CancelWantRequest) -> CancelWantResponse {
|
||||
// todo!()
|
||||
// }
|
||||
}
|
||||
|
||||
impl Clone for BuildEventLog<MemoryBELStorage> {
|
||||
|
|
|
|||
|
|
@ -2,15 +2,7 @@ use crate::data_build_event::Event;
|
|||
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
|
||||
use crate::job_run::{DepMissJobRun, SubProcessBackend};
|
||||
use crate::util::{current_timestamp, DatabuildError};
|
||||
use crate::{
|
||||
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
|
||||
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
||||
JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse,
|
||||
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
|
||||
ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode,
|
||||
TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1,
|
||||
WantDetail, WantStatusCode,
|
||||
};
|
||||
use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
|
||||
use rusqlite::types::FromSql;
|
||||
use rusqlite::ToSql;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -77,7 +69,7 @@ impl BuildState {
|
|||
Event::WantCancelV1(e) => self.handle_want_cancel(e),
|
||||
// Taint events
|
||||
Event::TaintCreateV1(e) => self.handle_taint_create(e),
|
||||
Event::TaintDeleteV1(e) => self.handle_taint_delete(e),
|
||||
Event::TaintCancelV1(e) => self.handle_taint_delete(e),
|
||||
// Ruh roh!
|
||||
_ => panic!("Unhandled event type! {:?}", event),
|
||||
}
|
||||
|
|
@ -90,6 +82,7 @@ impl BuildState {
|
|||
}
|
||||
|
||||
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
|
||||
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
|
||||
if let Some(want) = self.wants.get_mut(&event.want_id) {
|
||||
want.status = Some(WantStatusCode::WantCanceled.into());
|
||||
want.last_updated_timestamp = current_timestamp();
|
||||
|
|
@ -113,7 +106,8 @@ impl BuildState {
|
|||
}
|
||||
}
|
||||
|
||||
self.job_runs.insert(event.job_run_id.clone(), job_run.clone());
|
||||
self.job_runs
|
||||
.insert(event.job_run_id.clone(), job_run.clone());
|
||||
println!("Inserted job run: {:?}", job_run);
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -150,8 +144,14 @@ impl BuildState {
|
|||
} else {
|
||||
// Partition doesn't exist yet, needs to be inserted
|
||||
let want_ids = if let Some(jrid) = job_run_id {
|
||||
let job_run = self.get_job_run(jrid).expect("Job run must exist for partition");
|
||||
job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect()
|
||||
let job_run = self
|
||||
.get_job_run(jrid)
|
||||
.expect("Job run must exist for partition");
|
||||
job_run
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|wap| wap.want_id.clone())
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
|
@ -160,7 +160,9 @@ impl BuildState {
|
|||
r#ref: Some(pref.clone()),
|
||||
status: Some(status.into()),
|
||||
last_updated_timestamp: Some(current_timestamp()),
|
||||
job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]),
|
||||
job_run_ids: job_run_id
|
||||
.map(|jrid| vec![jrid.to_string()])
|
||||
.unwrap_or(vec![]),
|
||||
want_ids,
|
||||
..PartitionDetail::default()
|
||||
};
|
||||
|
|
@ -205,7 +207,11 @@ impl BuildState {
|
|||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||
// Update partitions being build by this job
|
||||
for pref in job_run.building_partitions {
|
||||
self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?;
|
||||
self.update_partition_status(
|
||||
&pref,
|
||||
PartitionStatusCode::PartitionLive,
|
||||
Some(&event.job_run_id),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -230,7 +236,11 @@ impl BuildState {
|
|||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
|
||||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
||||
for pref in job_run.building_partitions {
|
||||
self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?;
|
||||
self.update_partition_status(
|
||||
&pref,
|
||||
PartitionStatusCode::PartitionFailed,
|
||||
Some(&event.job_run_id),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -250,7 +260,7 @@ impl BuildState {
|
|||
todo!("...?")
|
||||
}
|
||||
|
||||
fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> {
|
||||
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> {
|
||||
todo!("...?")
|
||||
}
|
||||
|
||||
|
|
@ -354,7 +364,7 @@ impl BuildState {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn schedulable_wants(&self) -> WantsSchedulability {
|
||||
pub fn wants_schedulability(&self) -> WantsSchedulability {
|
||||
WantsSchedulability(
|
||||
self.wants
|
||||
.values()
|
||||
|
|
@ -424,9 +434,15 @@ pub struct WantSchedulability {
|
|||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
|
||||
|
||||
impl Into<bool> for WantsSchedulability {
|
||||
fn into(self) -> bool {
|
||||
self.0.iter().all(|w| w.is_schedulable())
|
||||
impl WantsSchedulability {
|
||||
pub fn schedulable_wants(self) -> Vec<WantDetail> {
|
||||
self.0
|
||||
.iter()
|
||||
.filter_map(|ws| match ws.is_schedulable() {
|
||||
false => None,
|
||||
true => Some(ws.want.clone()),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -484,7 +500,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_empty_wants_noop() {
|
||||
assert_eq!(BuildState::default().schedulable_wants().0.len(), 0);
|
||||
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
|
||||
}
|
||||
|
||||
// A want with satisfied upstreams (incl "none") should be schedulable
|
||||
|
|
@ -505,7 +521,7 @@ mod tests {
|
|||
)]));
|
||||
|
||||
// Should...
|
||||
let schedulability = state.schedulable_wants();
|
||||
let schedulability = state.wants_schedulability();
|
||||
let ws = schedulability.0.first().unwrap();
|
||||
assert!(ws.is_schedulable());
|
||||
}
|
||||
|
|
@ -522,7 +538,7 @@ mod tests {
|
|||
)]));
|
||||
|
||||
// Should...
|
||||
let schedulability = state.schedulable_wants();
|
||||
let schedulability = state.wants_schedulability();
|
||||
let ws = schedulability.0.first().unwrap();
|
||||
assert!(!ws.is_schedulable());
|
||||
}
|
||||
|
|
@ -547,7 +563,7 @@ mod tests {
|
|||
)]));
|
||||
|
||||
// Should...
|
||||
let schedulability = state.schedulable_wants();
|
||||
let schedulability = state.wants_schedulability();
|
||||
let ws = schedulability.0.first().unwrap();
|
||||
assert!(ws.is_schedulable());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,24 @@ syntax = "proto3";
|
|||
|
||||
package databuild.v1;
|
||||
|
||||
// Job Config
|
||||
// Fully resolved configuration for a job run, aside from the partitions it should produce, which
|
||||
// are passed as args
|
||||
message JobConfig {
|
||||
// The fully qualified, unique label representing the job
|
||||
string label = 1;
|
||||
// The command to run to launch the job
|
||||
string entrypoint = 2;
|
||||
// The environment variables to set for the job
|
||||
map<string, string> environment = 3;
|
||||
// A list of regex patterns that partitions must match to be considered for this job
|
||||
repeated string partition_patterns = 4;
|
||||
// TODO future fields to consider
|
||||
// - timeout
|
||||
// -
|
||||
}
|
||||
|
||||
|
||||
// Core Build Event Log (BEL)
|
||||
|
||||
message PartitionRef {
|
||||
|
|
@ -26,7 +44,7 @@ message DataBuildEvent {
|
|||
WantCancelEventV1 want_cancel_v1 = 18;
|
||||
// Taint events
|
||||
TaintCreateEventV1 taint_create_v1 = 19;
|
||||
TaintDeleteEventV1 taint_delete_v1 = 20;
|
||||
TaintCancelEventV1 taint_cancel_v1 = 20;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -137,7 +155,7 @@ message TaintCreateEventV1 {
|
|||
EventSource source = 5;
|
||||
optional string comment = 6;
|
||||
}
|
||||
message TaintDeleteEventV1 {
|
||||
message TaintCancelEventV1 {
|
||||
string taint_id = 1;
|
||||
EventSource source = 2;
|
||||
optional string comment = 3;
|
||||
|
|
@ -199,7 +217,7 @@ message PartitionStatus {
|
|||
string name = 2;
|
||||
}
|
||||
enum PartitionStatusCode {
|
||||
// TODO how do we avoid copying job states here?
|
||||
// TODO how do we avoid copying job states here? This is essentially a union of job states and taints?
|
||||
PartitionUnknown = 0;
|
||||
PartitionWanted = 1;
|
||||
PartitionBuilding = 2;
|
||||
|
|
@ -285,3 +303,53 @@ message ListJobRunsResponse {
|
|||
uint64 page = 3;
|
||||
uint64 page_size = 4;
|
||||
}
|
||||
|
||||
message CreateWantRequest {
|
||||
repeated PartitionRef partitions = 1;
|
||||
uint64 data_timestamp = 2;
|
||||
uint64 ttl_seconds = 3;
|
||||
uint64 sla_seconds = 4;
|
||||
EventSource source = 5;
|
||||
optional string comment = 6;
|
||||
}
|
||||
message CreateWantResponse {
|
||||
WantDetail data = 1;
|
||||
}
|
||||
|
||||
message CancelWantRequest {
|
||||
string want_id = 1;
|
||||
EventSource source = 2;
|
||||
optional string comment = 3;
|
||||
}
|
||||
message CancelWantResponse {
|
||||
WantDetail data = 1;
|
||||
}
|
||||
|
||||
message GetWantRequest {
|
||||
string want_id = 1;
|
||||
}
|
||||
message GetWantResponse {
|
||||
WantDetail data = 1;
|
||||
}
|
||||
|
||||
message CreateTaintRequest {
|
||||
// TODO
|
||||
}
|
||||
message CreateTaintResponse {
|
||||
// TODO
|
||||
}
|
||||
|
||||
message GetTaintRequest {
|
||||
// TODO
|
||||
}
|
||||
message GetTaintResponse {
|
||||
// TODO
|
||||
}
|
||||
|
||||
// Not implemented yet
|
||||
//message CancelTaintRequest {
|
||||
// // TODO
|
||||
//}
|
||||
//message CancelTaintResponse {
|
||||
// // TODO
|
||||
//}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use uuid::Uuid;
|
||||
use crate::data_build_event::Event;
|
||||
use crate::util::current_timestamp;
|
||||
use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
||||
use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
||||
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
|
||||
|
||||
impl From<&WantCreateEventV1> for WantDetail {
|
||||
|
|
@ -34,6 +35,16 @@ impl From<WantCancelEventV1> for Event {
|
|||
Event::WantCancelV1(value)
|
||||
}
|
||||
}
|
||||
impl From<TaintCreateEventV1> for Event {
|
||||
fn from(value: TaintCreateEventV1) -> Self {
|
||||
Event::TaintCreateV1(value)
|
||||
}
|
||||
}
|
||||
impl From<TaintCancelEventV1> for Event {
|
||||
fn from(value: TaintCancelEventV1) -> Self {
|
||||
Event::TaintCancelV1(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WantCreateEventV1> for WantAttributedPartitions {
|
||||
fn from(value: WantCreateEventV1) -> Self {
|
||||
|
|
@ -147,3 +158,65 @@ impl From<&WantDetail> for WantAttributedPartitions {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CreateWantRequest> for WantCreateEventV1 {
|
||||
fn from(value: CreateWantRequest) -> Self {
|
||||
WantCreateEventV1 {
|
||||
want_id: Uuid::new_v4().into(),
|
||||
partitions: value.partitions,
|
||||
data_timestamp: value.data_timestamp,
|
||||
ttl_seconds: value.ttl_seconds,
|
||||
sla_seconds: value.sla_seconds,
|
||||
source: value.source,
|
||||
comment: value.comment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<CreateWantResponse> for Option<WantDetail> {
|
||||
fn into(self) -> CreateWantResponse {
|
||||
CreateWantResponse {
|
||||
data: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<GetWantResponse> for Option<WantDetail> {
|
||||
fn into(self) -> GetWantResponse {
|
||||
GetWantResponse {
|
||||
data: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CancelWantRequest> for WantCancelEventV1 {
|
||||
fn from(value: CancelWantRequest) -> Self {
|
||||
WantCancelEventV1 {
|
||||
want_id: value.want_id,
|
||||
source: value.source,
|
||||
comment: value.comment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<CancelWantResponse> for Option<WantDetail> {
|
||||
fn into(self) -> CancelWantResponse {
|
||||
CancelWantResponse {
|
||||
data: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CreateTaintRequest> for TaintCreateEventV1 {
|
||||
fn from(value: CreateTaintRequest) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<CreateTaintResponse> for Option<TaintDetail> {
|
||||
fn into(self) -> CreateTaintResponse {
|
||||
CreateTaintResponse {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +1,12 @@
|
|||
use crate::job_run::{NotStartedJobRun, SubProcessBackend};
|
||||
use crate::{PartitionRef, WantDetail};
|
||||
use crate::{JobConfig, PartitionRef, WantDetail};
|
||||
use regex::Regex;
|
||||
use crate::util::DatabuildError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JobConfiguration {
|
||||
pub label: String,
|
||||
pub pattern: String,
|
||||
pub patterns: Vec<String>,
|
||||
pub entry_point: String,
|
||||
}
|
||||
|
||||
|
|
@ -19,8 +20,25 @@ impl JobConfiguration {
|
|||
}
|
||||
|
||||
pub fn matches(&self, refs: &PartitionRef) -> bool {
|
||||
self.patterns.iter().any(|pattern| {
|
||||
let regex =
|
||||
Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern));
|
||||
Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern));
|
||||
regex.is_match(&refs.r#ref)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JobConfig> for JobConfiguration {
|
||||
fn from(config: JobConfig) -> Self {
|
||||
Self {
|
||||
label: config.label,
|
||||
patterns: config.partition_patterns,
|
||||
entry_point: config.entrypoint,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_job_configuration(s: &str) -> Result<JobConfiguration, DatabuildError> {
|
||||
let cfg: JobConfig = serde_json::from_str(s)?;
|
||||
Ok(cfg.into())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,15 +2,12 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
|||
use crate::build_state::BuildState;
|
||||
use crate::data_build_event::Event;
|
||||
use crate::job::JobConfiguration;
|
||||
use crate::job_run::{
|
||||
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
|
||||
SubProcessBackend,
|
||||
};
|
||||
use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend};
|
||||
use crate::util::DatabuildError;
|
||||
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
use crate::util::DatabuildError;
|
||||
|
||||
/**
|
||||
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||
|
|
@ -126,6 +123,18 @@ struct GroupedWants {
|
|||
unhandled_wants: Vec<WantDetail>,
|
||||
}
|
||||
|
||||
impl GroupedWants {
|
||||
pub fn validate(&self) -> Result<(), DatabuildError> {
|
||||
if !self.unhandled_wants.is_empty() {
|
||||
// All wants must be mapped to jobs that can be handled
|
||||
// TODO we probably want to handle this gracefully in the near future
|
||||
Err(format!("Unable to map following wants: {:?}", self.unhandled_wants).into())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||
fn new(storage: S, config: OrchestratorConfig) -> Self {
|
||||
Self {
|
||||
|
|
@ -139,27 +148,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
}
|
||||
}
|
||||
|
||||
fn job_runs_count(&self) -> usize {
|
||||
self.not_started_jobs.len()
|
||||
+ self.running_jobs.len()
|
||||
+ self.completed_jobs.len()
|
||||
+ self.failed_jobs.len()
|
||||
+ self.dep_miss_jobs.len()
|
||||
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
||||
// TODO need to incorporate concurrency limit/pools here, probably?
|
||||
while let Some(job) = self.not_started_jobs.pop() {
|
||||
self.running_jobs.push(job.run()?);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Visits individual job runs, appending resulting events, and moving runs between run status
|
||||
/// containers.
|
||||
/// containers. Either jobs are still running, or they are moved to terminal states.
|
||||
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
|
||||
use crate::job_run::JobRunVisitResult;
|
||||
|
||||
// Coherence check setup
|
||||
let total_runs_count = self.job_runs_count();
|
||||
|
||||
// First, start any not-started jobs
|
||||
while let Some(job) = self.not_started_jobs.pop() {
|
||||
let running = job.run()?;
|
||||
self.running_jobs.push(running);
|
||||
}
|
||||
self.schedule_queued_jobs()?;
|
||||
|
||||
// Visit running jobs and transition them to terminal states
|
||||
let mut still_running = Vec::new();
|
||||
|
|
@ -175,11 +176,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
JobRunVisitResult::Completed(completed) => {
|
||||
// Emit success event
|
||||
println!("Completed job: {:?}", completed.id());
|
||||
let result = run_complete_to_events(&self.bel.state, &completed)?;
|
||||
for event in result.events {
|
||||
self.bel.append_event(&event)?;
|
||||
}
|
||||
// Move job to completed
|
||||
self.bel.append_event(&completed.state.to_event(&completed.id()))?;
|
||||
self.completed_jobs.push(completed);
|
||||
}
|
||||
JobRunVisitResult::Failed(failed) => {
|
||||
|
|
@ -194,64 +191,29 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
|
||||
self.bel.append_event(&event)?;
|
||||
}
|
||||
// Record missing upstream status in want details
|
||||
self.dep_miss_jobs.push(dep_miss);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.running_jobs = still_running;
|
||||
|
||||
// Panic because this should never happen
|
||||
assert_eq!(
|
||||
self.job_runs_count(),
|
||||
total_runs_count,
|
||||
"Detected job run count change during job run visit (should never happen)"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/** Continuously invoked function to watch wants and schedule new jobs */
|
||||
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
|
||||
// Collect unhandled wants, group by job that handles each partition,
|
||||
let schedulability = self.bel.state.schedulable_wants();
|
||||
println!("schedulability: {:?}", schedulability);
|
||||
let schedulable_wants = schedulability
|
||||
.0
|
||||
.iter()
|
||||
.filter_map(|ws| match ws.is_schedulable() {
|
||||
false => None,
|
||||
true => Some(ws.want.clone()),
|
||||
})
|
||||
.collect();
|
||||
let schedulable_wants = self.bel.state.wants_schedulability().schedulable_wants();
|
||||
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
|
||||
println!("grouped wants: {:?}", grouped_wants);
|
||||
grouped_wants.validate()?;
|
||||
|
||||
if !grouped_wants.unhandled_wants.is_empty() {
|
||||
// All wants must be mapped to jobs that can be handled
|
||||
// TODO we probably want to handle this gracefully in the near future
|
||||
Err(format!(
|
||||
"Unable to map following wants: {:?}",
|
||||
&grouped_wants.unhandled_wants
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
// Spawn jobs and add events
|
||||
for wg in grouped_wants.want_groups {
|
||||
let job_run = wg.spawn()?;
|
||||
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: job_run.job_run_id.into(),
|
||||
job_label: wg.job.label,
|
||||
building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(),
|
||||
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
|
||||
});
|
||||
self.bel.append_event(&job_buffer_event)?;
|
||||
self.not_started_jobs.push(job_run);
|
||||
self.queue_job(wg)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
|
||||
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
|
||||
|
|
@ -279,6 +241,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
}
|
||||
}
|
||||
|
||||
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
|
||||
// Spawn job run (not started, but need only be `.run`'d)
|
||||
let job_run = wg.spawn()?;
|
||||
|
||||
// Create job run buffer event
|
||||
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: job_run.job_run_id.into(),
|
||||
job_label: wg.job.label,
|
||||
building_partitions: wg
|
||||
.wants
|
||||
.iter()
|
||||
.map(|w| w.partitions.clone())
|
||||
.flatten()
|
||||
.collect(),
|
||||
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
|
||||
});
|
||||
self.bel.append_event(&job_buffer_event)?;
|
||||
self.not_started_jobs.push(job_run);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Result<(), DatabuildError> {
|
||||
self.poll_job_runs()?;
|
||||
self.poll_wants()?;
|
||||
|
|
@ -293,44 +277,14 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub struct JobRunCompleteResult {
|
||||
/// Events to append to the BEL from this job completing
|
||||
pub events: Vec<Event>,
|
||||
}
|
||||
|
||||
/// Handle successful run completion:
|
||||
/// - Adding run success event
|
||||
/// - Updating status for partitions actually built by the job
|
||||
fn run_complete_to_events(
|
||||
bel_state: &BuildState,
|
||||
completed: &CompletedJobRun<SubProcessBackend>,
|
||||
) -> Result<JobRunCompleteResult, DatabuildError> {
|
||||
let mut events = vec![
|
||||
// Event marking completion of job
|
||||
completed.state.to_event(&completed.id()),
|
||||
];
|
||||
// let job_detail = bel_state
|
||||
// .get_job_run(&completed.job_run_id.to_string())
|
||||
// .ok_or(format!(
|
||||
// "No job run found for id `{}`",
|
||||
// completed.job_run_id
|
||||
// ))?;
|
||||
|
||||
Ok(JobRunCompleteResult {
|
||||
// built_partitions: job_detail.building_partitions,
|
||||
events,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::WantCreateEventV1;
|
||||
use crate::build_event_log::MemoryBELStorage;
|
||||
use crate::job::JobConfiguration;
|
||||
use crate::mock_job_run::MockJobRun;
|
||||
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
|
||||
use crate::util::current_timestamp;
|
||||
use crate::WantCreateEventV1;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
|
||||
|
|
@ -363,12 +317,12 @@ mod tests {
|
|||
jobs: vec![
|
||||
JobConfiguration {
|
||||
label: "alpha".to_string(),
|
||||
pattern: "data/alpha".to_string(),
|
||||
patterns: vec!["data/alpha".to_string()],
|
||||
entry_point: MockJobRun::bin_path(),
|
||||
},
|
||||
JobConfiguration {
|
||||
label: "beta".to_string(),
|
||||
pattern: "data/beta".to_string(),
|
||||
patterns: vec!["data/beta".to_string()],
|
||||
entry_point: MockJobRun::bin_path(),
|
||||
},
|
||||
],
|
||||
|
|
@ -412,9 +366,9 @@ mod tests {
|
|||
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
|
||||
// created by itself (for dep miss job run failures)
|
||||
mod poll_wants {
|
||||
use crate::WantCreateEventV1;
|
||||
use crate::data_build_event::Event;
|
||||
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
|
||||
use crate::WantCreateEventV1;
|
||||
|
||||
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
|
||||
#[test]
|
||||
|
|
@ -441,7 +395,7 @@ mod tests {
|
|||
partitions: vec!["data/alpha".into()],
|
||||
..WantCreateEventV1::sample()
|
||||
})];
|
||||
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
|
||||
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 0);
|
||||
for e in events {
|
||||
orchestrator.bel.append_event(&e).expect("append");
|
||||
}
|
||||
|
|
@ -449,7 +403,7 @@ mod tests {
|
|||
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
|
||||
|
||||
// When
|
||||
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
|
||||
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 1);
|
||||
orchestrator
|
||||
.poll_wants()
|
||||
.expect("shouldn't fail to poll wants");
|
||||
|
|
@ -614,7 +568,9 @@ mod tests {
|
|||
|
||||
// When
|
||||
// Poll wants then schedule pending jobs
|
||||
orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed");
|
||||
orchestrator
|
||||
.poll_wants()
|
||||
.expect("stage unscheduled jobs based on wants failed");
|
||||
assert_eq!(orchestrator.not_started_jobs.len(), 1);
|
||||
// poll job runs should start job run
|
||||
orchestrator.poll_job_runs().expect("should start run");
|
||||
|
|
@ -622,14 +578,18 @@ mod tests {
|
|||
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
// Should still be running after 1ms
|
||||
orchestrator.poll_job_runs().expect("should still be running");
|
||||
orchestrator
|
||||
.poll_job_runs()
|
||||
.expect("should still be running");
|
||||
assert_eq!(orchestrator.running_jobs.len(), 1);
|
||||
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
|
||||
println!("STATE: {:?}", orchestrator.bel.state);
|
||||
|
||||
// Wait for it to complete
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
orchestrator.poll_job_runs().expect("should be able to poll existing job run");
|
||||
orchestrator
|
||||
.poll_job_runs()
|
||||
.expect("should be able to poll existing job run");
|
||||
|
||||
// Job run should have succeeded
|
||||
assert!(orchestrator.not_started_jobs.is_empty());
|
||||
|
|
@ -675,7 +635,7 @@ mod tests {
|
|||
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
|
||||
JobConfiguration {
|
||||
label: label.to_string(),
|
||||
pattern: pattern.to_string(),
|
||||
patterns: vec![pattern.to_string()],
|
||||
entry_point: "test_entrypoint".to_string(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,16 @@ impl From<prost::EncodeError> for DatabuildError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for DatabuildError {
|
||||
fn from(err: serde_json::Error) -> Self {
|
||||
Self {
|
||||
msg: err.to_string(),
|
||||
source: Some(Box::new(err)),
|
||||
backtrace: maybe_backtrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for DatabuildError {
|
||||
fn from(value: String) -> Self {
|
||||
Self::new(value)
|
||||
|
|
|
|||
|
|
@ -1,2 +1,9 @@
|
|||
|
||||
It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level.
|
||||
|
||||
Also, taints could be implemented as metadata also, e.g. a `databuild.tainted_at` field that is just set to the current time upon tainting a partition. This would involve a few endpoints:
|
||||
|
||||
1. Set partition metadata
|
||||
2. Get partition metadata
|
||||
|
||||
Big question is, do we need taint history? Or metadata assignment history? Temptation is YAGNI, but may be worth imagining here just to make sure.
|
||||
17
docs/impl_notes/2025-11-19_running-jobs.md
Normal file
17
docs/impl_notes/2025-11-19_running-jobs.md
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
|
||||
We've gotten to a place where the orchestrator reacts to wants, runs jobs, and turns dep misses into other wants, which in theory is all we need from it to get to v2. So, what do we need around that?
|
||||
|
||||
- Job configuration
|
||||
- Want submission - more generally, the API
|
||||
|
||||
How do those break down into work items?
|
||||
|
||||
### Job Configuration
|
||||
The databuild binary should accept a runtime config that points to a job config file, or just provides it directly, which populates the [`JobConfigurations`](databuild/orchestrator.rs:99). This should be fairly straightforward, and can probably just be deserialization of the read string via [the protobuf struct](databuild/databuild.proto:8).
|
||||
|
||||
### API
|
||||
We have the (beginnings of) API endpoints described in protobuf structs ([starting here](databuild/databuild.proto:174)). We just need to build an API server around it, and a CLI that can talk to the API. This would break down into the following layers:
|
||||
|
||||
- Build state accessor functions - transact via the protobuf structs
|
||||
- API - exposes build state accessor functions via HTTP
|
||||
- CLI - Talks to API, formats requests/responses, is able to start API server for databuild workspace
|
||||
Loading…
Reference in a new issue