Compare commits

...

4 commits

Author SHA1 Message Date
eadd23eb63 Sketch out some API stuff
Some checks are pending
/ setup (push) Waiting to run
2025-11-20 01:03:00 -08:00
d42bddac90 add thoughts on getting to v2 e2e running 2025-11-19 17:39:02 -08:00
66ba40e2db minor refactor on poll_jobs 2025-11-19 17:18:18 -08:00
9bdd435089 refactor orchestrator.poll_wants 2025-11-19 14:22:36 -08:00
10 changed files with 365 additions and 182 deletions

View file

@ -318,43 +318,6 @@
]
}
},
"@@aspect_rules_ts+//ts:extensions.bzl%ext": {
"general": {
"bzlTransitiveDigest": "aVqwKoRPrSXO367SJABlye04kmpR/9VM2xiXB3nh3Ls=",
"usagesDigest": "aJyEQO7QJEiaQqFH9CN9MlsG06TiRjGBpQl5IfRHTe0=",
"recordedFileInputs": {
"@@//databuild/dashboard/package.json": "6bca4800c4e27564303a32c8565d96fc21e572756c07b8a21566bd1195f757b2"
},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"npm_typescript": {
"repoRuleId": "@@aspect_rules_ts+//ts/private:npm_repositories.bzl%http_archive_version",
"attributes": {
"bzlmod": true,
"version": "",
"version_from": "@@//databuild/dashboard:package.json",
"integrity": "",
"build_file": "@@aspect_rules_ts+//ts:BUILD.typescript",
"build_file_substitutions": {
"bazel_worker_version": "5.4.2",
"google_protobuf_version": "3.20.1"
},
"urls": [
"https://registry.npmjs.org/typescript/-/typescript-{}.tgz"
]
}
}
},
"recordedRepoMappingEntries": [
[
"aspect_rules_ts+",
"bazel_tools",
"bazel_tools"
]
]
}
},
"@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": {
"general": {
"bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=",

View file

@ -1,5 +1,5 @@
use crate::data_build_event::Event;
use crate::{DataBuildEvent, WantDetail};
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail};
use prost::Message;
use rusqlite::Connection;
use std::error::Error;
@ -169,6 +169,57 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
let idx = self.storage.append_event(event)?;
Ok(idx)
}
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)
}
pub fn api_handle_list_taints(&self, req: ListTaintsRequest) -> ListTaintsResponse {
self.state.list_taints(&req)
}
pub fn api_handle_list_partitions(&self, req: ListPartitionsRequest) -> ListPartitionsResponse {
self.state.list_partitions(&req)
}
pub fn api_handle_list_job_runs(&self, req: ListJobRunsRequest) -> ListJobRunsResponse {
self.state.list_job_runs(&req)
}
pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result<CreateWantResponse, DatabuildError> {
let ev: WantCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_want_get(&self, req: GetWantRequest) -> GetWantResponse {
self.state.get_want(&req.want_id).into()
}
pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result<CancelWantResponse, DatabuildError> {
let ev: WantCancelEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
todo!();
let ev: TaintCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_taint(&ev.taint_id).into())
}
pub fn api_handle_taint_get(&self, req: GetTaintRequest) -> GetTaintResponse {
todo!()
}
// Not implemented yet
// pub fn api_handle_taint_cancel(&mut self, req: CancelWantRequest) -> CancelWantResponse {
// todo!()
// }
}
impl Clone for BuildEventLog<MemoryBELStorage> {

View file

@ -2,15 +2,7 @@ use crate::data_build_event::Event;
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::job_run::{DepMissJobRun, SubProcessBackend};
use crate::util::{current_timestamp, DatabuildError};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse,
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode,
TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1,
WantDetail, WantStatusCode,
};
use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::FromSql;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize};
@ -77,7 +69,7 @@ impl BuildState {
Event::WantCancelV1(e) => self.handle_want_cancel(e),
// Taint events
Event::TaintCreateV1(e) => self.handle_taint_create(e),
Event::TaintDeleteV1(e) => self.handle_taint_delete(e),
Event::TaintCancelV1(e) => self.handle_taint_delete(e),
// Ruh roh!
_ => panic!("Unhandled event type! {:?}", event),
}
@ -90,6 +82,7 @@ impl BuildState {
}
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
@ -113,7 +106,8 @@ impl BuildState {
}
}
self.job_runs.insert(event.job_run_id.clone(), job_run.clone());
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(())
}
@ -150,8 +144,14 @@ impl BuildState {
} else {
// Partition doesn't exist yet, needs to be inserted
let want_ids = if let Some(jrid) = job_run_id {
let job_run = self.get_job_run(jrid).expect("Job run must exist for partition");
job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect()
let job_run = self
.get_job_run(jrid)
.expect("Job run must exist for partition");
job_run
.servicing_wants
.iter()
.map(|wap| wap.want_id.clone())
.collect()
} else {
vec![]
};
@ -160,7 +160,9 @@ impl BuildState {
r#ref: Some(pref.clone()),
status: Some(status.into()),
last_updated_timestamp: Some(current_timestamp()),
job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]),
job_run_ids: job_run_id
.map(|jrid| vec![jrid.to_string()])
.unwrap_or(vec![]),
want_ids,
..PartitionDetail::default()
};
@ -205,7 +207,11 @@ impl BuildState {
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Update partitions being build by this job
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?;
self.update_partition_status(
&pref,
PartitionStatusCode::PartitionLive,
Some(&event.job_run_id),
)?;
}
Ok(())
}
@ -230,7 +236,11 @@ impl BuildState {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?;
self.update_partition_status(
&pref,
PartitionStatusCode::PartitionFailed,
Some(&event.job_run_id),
)?;
}
Ok(())
}
@ -250,7 +260,7 @@ impl BuildState {
todo!("...?")
}
fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> {
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> {
todo!("...?")
}
@ -354,7 +364,7 @@ impl BuildState {
}
}
pub fn schedulable_wants(&self) -> WantsSchedulability {
pub fn wants_schedulability(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
@ -424,9 +434,15 @@ pub struct WantSchedulability {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl Into<bool> for WantsSchedulability {
fn into(self) -> bool {
self.0.iter().all(|w| w.is_schedulable())
impl WantsSchedulability {
pub fn schedulable_wants(self) -> Vec<WantDetail> {
self.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect()
}
}
@ -484,7 +500,7 @@ mod tests {
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().schedulable_wants().0.len(), 0);
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
@ -505,7 +521,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
@ -522,7 +538,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(!ws.is_schedulable());
}
@ -547,7 +563,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}

View file

@ -2,6 +2,24 @@ syntax = "proto3";
package databuild.v1;
// Job Config
// Fully resolved configuration for a job run, aside from the partitions it should produce, which
// are passed as args
message JobConfig {
// The fully qualified, unique label representing the job
string label = 1;
// The command to run to launch the job
string entrypoint = 2;
// The environment variables to set for the job
map<string, string> environment = 3;
// A list of regex patterns that partitions must match to be considered for this job
repeated string partition_patterns = 4;
// TODO future fields to consider
// - timeout
// -
}
// Core Build Event Log (BEL)
message PartitionRef {
@ -26,7 +44,7 @@ message DataBuildEvent {
WantCancelEventV1 want_cancel_v1 = 18;
// Taint events
TaintCreateEventV1 taint_create_v1 = 19;
TaintDeleteEventV1 taint_delete_v1 = 20;
TaintCancelEventV1 taint_cancel_v1 = 20;
}
}
@ -137,7 +155,7 @@ message TaintCreateEventV1 {
EventSource source = 5;
optional string comment = 6;
}
message TaintDeleteEventV1 {
message TaintCancelEventV1 {
string taint_id = 1;
EventSource source = 2;
optional string comment = 3;
@ -199,7 +217,7 @@ message PartitionStatus {
string name = 2;
}
enum PartitionStatusCode {
// TODO how do we avoid copying job states here?
// TODO how do we avoid copying job states here? This is essentially a union of job states and taints?
PartitionUnknown = 0;
PartitionWanted = 1;
PartitionBuilding = 2;
@ -284,4 +302,54 @@ message ListJobRunsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
}
message CreateWantRequest {
repeated PartitionRef partitions = 1;
uint64 data_timestamp = 2;
uint64 ttl_seconds = 3;
uint64 sla_seconds = 4;
EventSource source = 5;
optional string comment = 6;
}
message CreateWantResponse {
WantDetail data = 1;
}
message CancelWantRequest {
string want_id = 1;
EventSource source = 2;
optional string comment = 3;
}
message CancelWantResponse {
WantDetail data = 1;
}
message GetWantRequest {
string want_id = 1;
}
message GetWantResponse {
WantDetail data = 1;
}
message CreateTaintRequest {
// TODO
}
message CreateTaintResponse {
// TODO
}
message GetTaintRequest {
// TODO
}
message GetTaintResponse {
// TODO
}
// Not implemented yet
//message CancelTaintRequest {
// // TODO
//}
//message CancelTaintResponse {
// // TODO
//}

View file

@ -1,6 +1,7 @@
use uuid::Uuid;
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
impl From<&WantCreateEventV1> for WantDetail {
@ -34,6 +35,16 @@ impl From<WantCancelEventV1> for Event {
Event::WantCancelV1(value)
}
}
impl From<TaintCreateEventV1> for Event {
fn from(value: TaintCreateEventV1) -> Self {
Event::TaintCreateV1(value)
}
}
impl From<TaintCancelEventV1> for Event {
fn from(value: TaintCancelEventV1) -> Self {
Event::TaintCancelV1(value)
}
}
impl From<WantCreateEventV1> for WantAttributedPartitions {
fn from(value: WantCreateEventV1) -> Self {
@ -146,4 +157,66 @@ impl From<&WantDetail> for WantAttributedPartitions {
partitions: value.partitions.clone(),
}
}
}
impl From<CreateWantRequest> for WantCreateEventV1 {
fn from(value: CreateWantRequest) -> Self {
WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: value.partitions,
data_timestamp: value.data_timestamp,
ttl_seconds: value.ttl_seconds,
sla_seconds: value.sla_seconds,
source: value.source,
comment: value.comment,
}
}
}
impl Into<CreateWantResponse> for Option<WantDetail> {
fn into(self) -> CreateWantResponse {
CreateWantResponse {
data: self,
}
}
}
impl Into<GetWantResponse> for Option<WantDetail> {
fn into(self) -> GetWantResponse {
GetWantResponse {
data: self,
}
}
}
impl From<CancelWantRequest> for WantCancelEventV1 {
fn from(value: CancelWantRequest) -> Self {
WantCancelEventV1 {
want_id: value.want_id,
source: value.source,
comment: value.comment,
}
}
}
impl Into<CancelWantResponse> for Option<WantDetail> {
fn into(self) -> CancelWantResponse {
CancelWantResponse {
data: self,
}
}
}
impl From<CreateTaintRequest> for TaintCreateEventV1 {
fn from(value: CreateTaintRequest) -> Self {
todo!()
}
}
impl Into<CreateTaintResponse> for Option<TaintDetail> {
fn into(self) -> CreateTaintResponse {
CreateTaintResponse {
// TODO
}
}
}

View file

@ -1,11 +1,12 @@
use crate::job_run::{NotStartedJobRun, SubProcessBackend};
use crate::{PartitionRef, WantDetail};
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
use crate::util::DatabuildError;
#[derive(Debug, Clone)]
pub struct JobConfiguration {
pub label: String,
pub pattern: String,
pub patterns: Vec<String>,
pub entry_point: String,
}
@ -19,8 +20,25 @@ impl JobConfiguration {
}
pub fn matches(&self, refs: &PartitionRef) -> bool {
let regex =
Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern));
regex.is_match(&refs.r#ref)
self.patterns.iter().any(|pattern| {
let regex =
Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern));
regex.is_match(&refs.r#ref)
})
}
}
impl From<JobConfig> for JobConfiguration {
fn from(config: JobConfig) -> Self {
Self {
label: config.label,
patterns: config.partition_patterns,
entry_point: config.entrypoint,
}
}
}
pub fn parse_job_configuration(s: &str) -> Result<JobConfiguration, DatabuildError> {
let cfg: JobConfig = serde_json::from_str(s)?;
Ok(cfg.into())
}

View file

@ -2,15 +2,12 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
SubProcessBackend,
};
use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use crate::util::DatabuildError;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
@ -126,6 +123,18 @@ struct GroupedWants {
unhandled_wants: Vec<WantDetail>,
}
impl GroupedWants {
pub fn validate(&self) -> Result<(), DatabuildError> {
if !self.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!("Unable to map following wants: {:?}", self.unhandled_wants).into())
} else {
Ok(())
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
@ -139,27 +148,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
fn job_runs_count(&self) -> usize {
self.not_started_jobs.len()
+ self.running_jobs.len()
+ self.completed_jobs.len()
+ self.failed_jobs.len()
+ self.dep_miss_jobs.len()
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
// TODO need to incorporate concurrency limit/pools here, probably?
while let Some(job) = self.not_started_jobs.pop() {
self.running_jobs.push(job.run()?);
}
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers.
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::JobRunVisitResult;
// Coherence check setup
let total_runs_count = self.job_runs_count();
// First, start any not-started jobs
while let Some(job) = self.not_started_jobs.pop() {
let running = job.run()?;
self.running_jobs.push(running);
}
self.schedule_queued_jobs()?;
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
@ -175,11 +176,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
JobRunVisitResult::Completed(completed) => {
// Emit success event
println!("Completed job: {:?}", completed.id());
let result = run_complete_to_events(&self.bel.state, &completed)?;
for event in result.events {
self.bel.append_event(&event)?;
}
// Move job to completed
self.bel.append_event(&completed.state.to_event(&completed.id()))?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
@ -194,63 +191,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
self.bel.append_event(&event)?;
}
// Record missing upstream status in want details
self.dep_miss_jobs.push(dep_miss);
}
}
}
self.running_jobs = still_running;
// Panic because this should never happen
assert_eq!(
self.job_runs_count(),
total_runs_count,
"Detected job run count change during job run visit (should never happen)"
);
Ok(())
}
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
// Collect unhandled wants, group by job that handles each partition,
let schedulability = self.bel.state.schedulable_wants();
println!("schedulability: {:?}", schedulability);
let schedulable_wants = schedulability
.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect();
let schedulable_wants = self.bel.state.wants_schedulability().schedulable_wants();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
println!("grouped wants: {:?}", grouped_wants);
grouped_wants.validate()?;
if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!(
"Unable to map following wants: {:?}",
&grouped_wants.unhandled_wants
)
.into())
} else {
// Spawn jobs and add events
for wg in grouped_wants.want_groups {
let job_run = wg.spawn()?;
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_label: wg.job.label,
building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
}
Ok(())
// Spawn jobs and add events
for wg in grouped_wants.want_groups {
self.queue_job(wg)?;
}
Ok(())
}
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
@ -279,6 +241,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
// Spawn job run (not started, but need only be `.run`'d)
let job_run = wg.spawn()?;
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_label: wg.job.label,
building_partitions: wg
.wants
.iter()
.map(|w| w.partitions.clone())
.flatten()
.collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
Ok(())
}
fn step(&mut self) -> Result<(), DatabuildError> {
self.poll_job_runs()?;
self.poll_wants()?;
@ -293,44 +277,14 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
#[derive(Default, Clone, Debug)]
pub struct JobRunCompleteResult {
/// Events to append to the BEL from this job completing
pub events: Vec<Event>,
}
/// Handle successful run completion:
/// - Adding run success event
/// - Updating status for partitions actually built by the job
fn run_complete_to_events(
bel_state: &BuildState,
completed: &CompletedJobRun<SubProcessBackend>,
) -> Result<JobRunCompleteResult, DatabuildError> {
let mut events = vec![
// Event marking completion of job
completed.state.to_event(&completed.id()),
];
// let job_detail = bel_state
// .get_job_run(&completed.job_run_id.to_string())
// .ok_or(format!(
// "No job run found for id `{}`",
// completed.job_run_id
// ))?;
Ok(JobRunCompleteResult {
// built_partitions: job_detail.building_partitions,
events,
})
}
#[cfg(test)]
mod tests {
use crate::WantCreateEventV1;
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use crate::WantCreateEventV1;
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
@ -363,12 +317,12 @@ mod tests {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
pattern: "data/alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: MockJobRun::bin_path(),
},
JobConfiguration {
label: "beta".to_string(),
pattern: "data/beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: MockJobRun::bin_path(),
},
],
@ -412,9 +366,9 @@ mod tests {
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::WantCreateEventV1;
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::WantCreateEventV1;
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
@ -441,7 +395,7 @@ mod tests {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
@ -449,7 +403,7 @@ mod tests {
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
// When
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
@ -614,7 +568,9 @@ mod tests {
// When
// Poll wants then schedule pending jobs
orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed");
orchestrator
.poll_wants()
.expect("stage unscheduled jobs based on wants failed");
assert_eq!(orchestrator.not_started_jobs.len(), 1);
// poll job runs should start job run
orchestrator.poll_job_runs().expect("should start run");
@ -622,14 +578,18 @@ mod tests {
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator.poll_job_runs().expect("should still be running");
orchestrator
.poll_job_runs()
.expect("should still be running");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
// Wait for it to complete
thread::sleep(Duration::from_millis(10));
orchestrator.poll_job_runs().expect("should be able to poll existing job run");
orchestrator
.poll_job_runs()
.expect("should be able to poll existing job run");
// Job run should have succeeded
assert!(orchestrator.not_started_jobs.is_empty());
@ -675,7 +635,7 @@ mod tests {
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration {
label: label.to_string(),
pattern: pattern.to_string(),
patterns: vec![pattern.to_string()],
entry_point: "test_entrypoint".to_string(),
}
}

View file

@ -62,6 +62,16 @@ impl From<prost::EncodeError> for DatabuildError {
}
}
impl From<serde_json::Error> for DatabuildError {
fn from(err: serde_json::Error) -> Self {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
}
}
}
impl From<String> for DatabuildError {
fn from(value: String) -> Self {
Self::new(value)

View file

@ -1,2 +1,9 @@
It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level.
Also, taints could be implemented as metadata also, e.g. a `databuild.tainted_at` field that is just set to the current time upon tainting a partition. This would involve a few endpoints:
1. Set partition metadata
2. Get partition metadata
Big question is, do we need taint history? Or metadata assignment history? Temptation is YAGNI, but may be worth imagining here just to make sure.

View file

@ -0,0 +1,17 @@
We've gotten to a place where the orchestrator reacts to wants, runs jobs, and turns dep misses into other wants, which in theory is all we need from it to get to v2. So, what do we need around that?
- Job configuration
- Want submission - more generally, the API
How do those break down into work items?
### Job Configuration
The databuild binary should accept a runtime config that points to a job config file, or just provides it directly, which populates the [`JobConfigurations`](databuild/orchestrator.rs:99). This should be fairly straightforward, and can probably just be deserialization of the read string via [the protobuf struct](databuild/databuild.proto:8).
### API
We have the (beginnings of) API endpoints described in protobuf structs ([starting here](databuild/databuild.proto:174)). We just need to build an API server around it, and a CLI that can talk to the API. This would break down into the following layers:
- Build state accessor functions - transact via the protobuf structs
- API - exposes build state accessor functions via HTTP
- CLI - Talks to API, formats requests/responses, is able to start API server for databuild workspace