refactor orchestrator.poll_wants

This commit is contained in:
Stuart Axelbrooke 2025-11-19 14:22:36 -08:00
parent 2cf778a07b
commit 9bdd435089
5 changed files with 125 additions and 102 deletions

View file

@ -318,43 +318,6 @@
]
}
},
"@@aspect_rules_ts+//ts:extensions.bzl%ext": {
"general": {
"bzlTransitiveDigest": "aVqwKoRPrSXO367SJABlye04kmpR/9VM2xiXB3nh3Ls=",
"usagesDigest": "aJyEQO7QJEiaQqFH9CN9MlsG06TiRjGBpQl5IfRHTe0=",
"recordedFileInputs": {
"@@//databuild/dashboard/package.json": "6bca4800c4e27564303a32c8565d96fc21e572756c07b8a21566bd1195f757b2"
},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"npm_typescript": {
"repoRuleId": "@@aspect_rules_ts+//ts/private:npm_repositories.bzl%http_archive_version",
"attributes": {
"bzlmod": true,
"version": "",
"version_from": "@@//databuild/dashboard:package.json",
"integrity": "",
"build_file": "@@aspect_rules_ts+//ts:BUILD.typescript",
"build_file_substitutions": {
"bazel_worker_version": "5.4.2",
"google_protobuf_version": "3.20.1"
},
"urls": [
"https://registry.npmjs.org/typescript/-/typescript-{}.tgz"
]
}
}
},
"recordedRepoMappingEntries": [
[
"aspect_rules_ts+",
"bazel_tools",
"bazel_tools"
]
]
}
},
"@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": {
"general": {
"bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=",

View file

@ -1,18 +1,17 @@
use crate::data_build_event::Event;
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::job_run::{DepMissJobRun, SubProcessBackend};
use crate::util::{current_timestamp, DatabuildError};
use crate::util::{DatabuildError, current_timestamp};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse,
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode,
TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1,
WantDetail, WantStatusCode,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
PartitionRef, PartitionStatusCode, TaintCreateEventV1, TaintDeleteEventV1, TaintDetail,
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode,
};
use rusqlite::types::FromSql;
use rusqlite::ToSql;
use rusqlite::types::FromSql;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error;
@ -113,7 +112,8 @@ impl BuildState {
}
}
self.job_runs.insert(event.job_run_id.clone(), job_run.clone());
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(())
}
@ -150,8 +150,14 @@ impl BuildState {
} else {
// Partition doesn't exist yet, needs to be inserted
let want_ids = if let Some(jrid) = job_run_id {
let job_run = self.get_job_run(jrid).expect("Job run must exist for partition");
job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect()
let job_run = self
.get_job_run(jrid)
.expect("Job run must exist for partition");
job_run
.servicing_wants
.iter()
.map(|wap| wap.want_id.clone())
.collect()
} else {
vec![]
};
@ -160,7 +166,9 @@ impl BuildState {
r#ref: Some(pref.clone()),
status: Some(status.into()),
last_updated_timestamp: Some(current_timestamp()),
job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]),
job_run_ids: job_run_id
.map(|jrid| vec![jrid.to_string()])
.unwrap_or(vec![]),
want_ids,
..PartitionDetail::default()
};
@ -205,7 +213,11 @@ impl BuildState {
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Update partitions being build by this job
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?;
self.update_partition_status(
&pref,
PartitionStatusCode::PartitionLive,
Some(&event.job_run_id),
)?;
}
Ok(())
}
@ -230,7 +242,11 @@ impl BuildState {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?;
self.update_partition_status(
&pref,
PartitionStatusCode::PartitionFailed,
Some(&event.job_run_id),
)?;
}
Ok(())
}
@ -354,7 +370,7 @@ impl BuildState {
}
}
pub fn schedulable_wants(&self) -> WantsSchedulability {
pub fn wants_schedulability(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
@ -424,9 +440,15 @@ pub struct WantSchedulability {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl Into<bool> for WantsSchedulability {
fn into(self) -> bool {
self.0.iter().all(|w| w.is_schedulable())
impl WantsSchedulability {
pub fn schedulable_wants(self) -> Vec<WantDetail> {
self.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect()
}
}
@ -484,7 +506,7 @@ mod tests {
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().schedulable_wants().0.len(), 0);
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
@ -505,7 +527,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
@ -522,7 +544,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(!ws.is_schedulable());
}
@ -547,7 +569,7 @@ mod tests {
)]));
// Should...
let schedulability = state.schedulable_wants();
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}

View file

@ -2,6 +2,24 @@ syntax = "proto3";
package databuild.v1;
// Job Config
// Fully resolved configuration for a job run, aside from the partitions it should produce, which
// are passed as args
message JobConfig {
// The fully qualified, unique label representing the job
string label = 1;
// The command to run to launch the job
string entrypoint = 2;
// The environment variables to set for the job
map<string, string> environment = 3;
// A list of regex patterns that partitions must match to be considered for this job
repeated string partition_patterns = 4;
// TODO future fields to consider
// - timeout
// -
}
// Core Build Event Log (BEL)
message PartitionRef {

View file

@ -6,11 +6,11 @@ use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
SubProcessBackend,
};
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use crate::util::DatabuildError;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
@ -126,6 +126,18 @@ struct GroupedWants {
unhandled_wants: Vec<WantDetail>,
}
impl GroupedWants {
pub fn validate(&self) -> Result<(), DatabuildError> {
if !self.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!("Unable to map following wants: {:?}", self.unhandled_wants).into())
} else {
Ok(())
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
@ -214,44 +226,17 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
// Collect unhandled wants, group by job that handles each partition,
let schedulability = self.bel.state.schedulable_wants();
println!("schedulability: {:?}", schedulability);
let schedulable_wants = schedulability
.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect();
let schedulable_wants = self.bel.state.wants_schedulability().schedulable_wants();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
println!("grouped wants: {:?}", grouped_wants);
grouped_wants.validate()?;
if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!(
"Unable to map following wants: {:?}",
&grouped_wants.unhandled_wants
)
.into())
} else {
// Spawn jobs and add events
for wg in grouped_wants.want_groups {
let job_run = wg.spawn()?;
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_label: wg.job.label,
building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
self.queue_job(wg)?;
}
Ok(())
}
}
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
@ -279,6 +264,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
// Spawn job run (not started, but need only be `.run`'d)
let job_run = wg.spawn()?;
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_label: wg.job.label,
building_partitions: wg
.wants
.iter()
.map(|w| w.partitions.clone())
.flatten()
.collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
Ok(())
}
fn step(&mut self) -> Result<(), DatabuildError> {
self.poll_job_runs()?;
self.poll_wants()?;
@ -325,12 +332,12 @@ fn run_complete_to_events(
#[cfg(test)]
mod tests {
use crate::WantCreateEventV1;
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use crate::WantCreateEventV1;
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
@ -412,9 +419,9 @@ mod tests {
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::WantCreateEventV1;
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::WantCreateEventV1;
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
@ -441,7 +448,7 @@ mod tests {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
@ -449,7 +456,7 @@ mod tests {
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
// When
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
@ -614,7 +621,9 @@ mod tests {
// When
// Poll wants then schedule pending jobs
orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed");
orchestrator
.poll_wants()
.expect("stage unscheduled jobs based on wants failed");
assert_eq!(orchestrator.not_started_jobs.len(), 1);
// poll job runs should start job run
orchestrator.poll_job_runs().expect("should start run");
@ -622,14 +631,18 @@ mod tests {
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator.poll_job_runs().expect("should still be running");
orchestrator
.poll_job_runs()
.expect("should still be running");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
// Wait for it to complete
thread::sleep(Duration::from_millis(10));
orchestrator.poll_job_runs().expect("should be able to poll existing job run");
orchestrator
.poll_job_runs()
.expect("should be able to poll existing job run");
// Job run should have succeeded
assert!(orchestrator.not_started_jobs.is_empty());

View file

@ -1,2 +1,9 @@
It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level.
Also, taints could be implemented as metadata also, e.g. a `databuild.tainted_at` field that is just set to the current time upon tainting a partition. This would involve a few endpoints:
1. Set partition metadata
2. Get partition metadata
Big question is, do we need taint history? Or metadata assignment history? Temptation is YAGNI, but may be worth imagining here just to make sure.