diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 6db906f..d459191 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -318,43 +318,6 @@ ] } }, - "@@aspect_rules_ts+//ts:extensions.bzl%ext": { - "general": { - "bzlTransitiveDigest": "aVqwKoRPrSXO367SJABlye04kmpR/9VM2xiXB3nh3Ls=", - "usagesDigest": "aJyEQO7QJEiaQqFH9CN9MlsG06TiRjGBpQl5IfRHTe0=", - "recordedFileInputs": { - "@@//databuild/dashboard/package.json": "6bca4800c4e27564303a32c8565d96fc21e572756c07b8a21566bd1195f757b2" - }, - "recordedDirentsInputs": {}, - "envVariables": {}, - "generatedRepoSpecs": { - "npm_typescript": { - "repoRuleId": "@@aspect_rules_ts+//ts/private:npm_repositories.bzl%http_archive_version", - "attributes": { - "bzlmod": true, - "version": "", - "version_from": "@@//databuild/dashboard:package.json", - "integrity": "", - "build_file": "@@aspect_rules_ts+//ts:BUILD.typescript", - "build_file_substitutions": { - "bazel_worker_version": "5.4.2", - "google_protobuf_version": "3.20.1" - }, - "urls": [ - "https://registry.npmjs.org/typescript/-/typescript-{}.tgz" - ] - } - } - }, - "recordedRepoMappingEntries": [ - [ - "aspect_rules_ts+", - "bazel_tools", - "bazel_tools" - ] - ] - } - }, "@@rules_kotlin+//src/main/starlark/core/repositories:bzlmod_setup.bzl%rules_kotlin_extensions": { "general": { "bzlTransitiveDigest": "OlvsB0HsvxbR8ZN+J9Vf00X/+WVz/Y/5Xrq2LgcVfdo=", diff --git a/databuild/build_state.rs b/databuild/build_state.rs index b4e7137..2a87667 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,18 +1,17 @@ use crate::data_build_event::Event; -use crate::data_deps::{missing_deps_to_want_events, WantTimestamps}; +use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; use crate::job_run::{DepMissJobRun, SubProcessBackend}; -use crate::util::{current_timestamp, DatabuildError}; +use crate::util::{DatabuildError, current_timestamp}; use crate::{ JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, - JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, - JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, - ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, - ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, - TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, - WantDetail, WantStatusCode, + JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, + ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, + ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, + PartitionRef, PartitionStatusCode, TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, + WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode, }; -use rusqlite::types::FromSql; use rusqlite::ToSql; +use rusqlite::types::FromSql; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::error::Error; @@ -113,7 +112,8 @@ impl BuildState { } } - self.job_runs.insert(event.job_run_id.clone(), job_run.clone()); + self.job_runs + .insert(event.job_run_id.clone(), job_run.clone()); println!("Inserted job run: {:?}", job_run); Ok(()) } @@ -150,8 +150,14 @@ impl BuildState { } else { // Partition doesn't exist yet, needs to be inserted let want_ids = if let Some(jrid) = job_run_id { - let job_run = self.get_job_run(jrid).expect("Job run must exist for partition"); - job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect() + let job_run = self + .get_job_run(jrid) + .expect("Job run must exist for partition"); + job_run + .servicing_wants + .iter() + .map(|wap| wap.want_id.clone()) + .collect() } else { vec![] }; @@ -160,7 +166,9 @@ impl BuildState { r#ref: Some(pref.clone()), status: Some(status.into()), last_updated_timestamp: Some(current_timestamp()), - job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]), + job_run_ids: job_run_id + .map(|jrid| vec![jrid.to_string()]) + .unwrap_or(vec![]), want_ids, ..PartitionDetail::default() }; @@ -205,7 +213,11 @@ impl BuildState { let job_run = self.get_job_run(&event.job_run_id).unwrap(); // Update partitions being build by this job for pref in job_run.building_partitions { - self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?; + self.update_partition_status( + &pref, + PartitionStatusCode::PartitionLive, + Some(&event.job_run_id), + )?; } Ok(()) } @@ -230,7 +242,11 @@ impl BuildState { self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?; let job_run = self.get_job_run(&event.job_run_id).unwrap(); for pref in job_run.building_partitions { - self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?; + self.update_partition_status( + &pref, + PartitionStatusCode::PartitionFailed, + Some(&event.job_run_id), + )?; } Ok(()) } @@ -354,7 +370,7 @@ impl BuildState { } } - pub fn schedulable_wants(&self) -> WantsSchedulability { + pub fn wants_schedulability(&self) -> WantsSchedulability { WantsSchedulability( self.wants .values() @@ -424,9 +440,15 @@ pub struct WantSchedulability { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct WantsSchedulability(pub Vec); -impl Into for WantsSchedulability { - fn into(self) -> bool { - self.0.iter().all(|w| w.is_schedulable()) +impl WantsSchedulability { + pub fn schedulable_wants(self) -> Vec { + self.0 + .iter() + .filter_map(|ws| match ws.is_schedulable() { + false => None, + true => Some(ws.want.clone()), + }) + .collect() } } @@ -484,7 +506,7 @@ mod tests { #[test] fn test_empty_wants_noop() { - assert_eq!(BuildState::default().schedulable_wants().0.len(), 0); + assert_eq!(BuildState::default().wants_schedulability().0.len(), 0); } // A want with satisfied upstreams (incl "none") should be schedulable @@ -505,7 +527,7 @@ mod tests { )])); // Should... - let schedulability = state.schedulable_wants(); + let schedulability = state.wants_schedulability(); let ws = schedulability.0.first().unwrap(); assert!(ws.is_schedulable()); } @@ -522,7 +544,7 @@ mod tests { )])); // Should... - let schedulability = state.schedulable_wants(); + let schedulability = state.wants_schedulability(); let ws = schedulability.0.first().unwrap(); assert!(!ws.is_schedulable()); } @@ -547,7 +569,7 @@ mod tests { )])); // Should... - let schedulability = state.schedulable_wants(); + let schedulability = state.wants_schedulability(); let ws = schedulability.0.first().unwrap(); assert!(ws.is_schedulable()); } diff --git a/databuild/databuild.proto b/databuild/databuild.proto index fdc2bac..da63a8b 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -2,6 +2,24 @@ syntax = "proto3"; package databuild.v1; +// Job Config +// Fully resolved configuration for a job run, aside from the partitions it should produce, which +// are passed as args +message JobConfig { + // The fully qualified, unique label representing the job + string label = 1; + // The command to run to launch the job + string entrypoint = 2; + // The environment variables to set for the job + map environment = 3; + // A list of regex patterns that partitions must match to be considered for this job + repeated string partition_patterns = 4; + // TODO future fields to consider + // - timeout + // - +} + + // Core Build Event Log (BEL) message PartitionRef { diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 411942c..3207080 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -6,11 +6,11 @@ use crate::job_run::{ CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun, SubProcessBackend, }; +use crate::util::DatabuildError; use crate::{JobRunBufferEventV1, PartitionRef, WantDetail}; use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; -use crate::util::DatabuildError; /** Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads + @@ -126,6 +126,18 @@ struct GroupedWants { unhandled_wants: Vec, } +impl GroupedWants { + pub fn validate(&self) -> Result<(), DatabuildError> { + if !self.unhandled_wants.is_empty() { + // All wants must be mapped to jobs that can be handled + // TODO we probably want to handle this gracefully in the near future + Err(format!("Unable to map following wants: {:?}", self.unhandled_wants).into()) + } else { + Ok(()) + } + } +} + impl Orchestrator { fn new(storage: S, config: OrchestratorConfig) -> Self { Self { @@ -214,43 +226,16 @@ impl Orchestrator { /** Continuously invoked function to watch wants and schedule new jobs */ fn poll_wants(&mut self) -> Result<(), DatabuildError> { // Collect unhandled wants, group by job that handles each partition, - let schedulability = self.bel.state.schedulable_wants(); - println!("schedulability: {:?}", schedulability); - let schedulable_wants = schedulability - .0 - .iter() - .filter_map(|ws| match ws.is_schedulable() { - false => None, - true => Some(ws.want.clone()), - }) - .collect(); + let schedulable_wants = self.bel.state.wants_schedulability().schedulable_wants(); let grouped_wants = Orchestrator::::group_wants(&self.config, &schedulable_wants); - println!("grouped wants: {:?}", grouped_wants); + grouped_wants.validate()?; - if !grouped_wants.unhandled_wants.is_empty() { - // All wants must be mapped to jobs that can be handled - // TODO we probably want to handle this gracefully in the near future - Err(format!( - "Unable to map following wants: {:?}", - &grouped_wants.unhandled_wants - ) - .into()) - } else { - // Spawn jobs and add events - for wg in grouped_wants.want_groups { - let job_run = wg.spawn()?; - let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 { - job_run_id: job_run.job_run_id.into(), - job_label: wg.job.label, - building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(), - want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(), - }); - self.bel.append_event(&job_buffer_event)?; - self.not_started_jobs.push(job_run); - } - - Ok(()) + // Spawn jobs and add events + for wg in grouped_wants.want_groups { + self.queue_job(wg)?; } + + Ok(()) } fn group_wants(config: &OrchestratorConfig, wants: &Vec) -> GroupedWants { @@ -279,6 +264,28 @@ impl Orchestrator { } } + fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> { + // Spawn job run (not started, but need only be `.run`'d) + let job_run = wg.spawn()?; + + // Create job run buffer event + let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 { + job_run_id: job_run.job_run_id.into(), + job_label: wg.job.label, + building_partitions: wg + .wants + .iter() + .map(|w| w.partitions.clone()) + .flatten() + .collect(), + want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(), + }); + self.bel.append_event(&job_buffer_event)?; + self.not_started_jobs.push(job_run); + + Ok(()) + } + fn step(&mut self) -> Result<(), DatabuildError> { self.poll_job_runs()?; self.poll_wants()?; @@ -325,12 +332,12 @@ fn run_complete_to_events( #[cfg(test)] mod tests { + use crate::WantCreateEventV1; use crate::build_event_log::MemoryBELStorage; use crate::job::JobConfiguration; use crate::mock_job_run::MockJobRun; use crate::orchestrator::{Orchestrator, OrchestratorConfig}; use crate::util::current_timestamp; - use crate::WantCreateEventV1; use uuid::Uuid; fn build_orchestrator() -> Orchestrator { @@ -412,9 +419,9 @@ mod tests { // The orchestrator polls wants so that it can react to new wants created by users, or to wants // created by itself (for dep miss job run failures) mod poll_wants { + use crate::WantCreateEventV1; use crate::data_build_event::Event; use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; - use crate::WantCreateEventV1; // Use case: Empty schedulable wants is a valid case, and should create no new jobs. #[test] @@ -441,7 +448,7 @@ mod tests { partitions: vec!["data/alpha".into()], ..WantCreateEventV1::sample() })]; - assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0); + assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 0); for e in events { orchestrator.bel.append_event(&e).expect("append"); } @@ -449,7 +456,7 @@ mod tests { assert_eq!(orchestrator.bel.state.count_job_runs(), 0); // When - assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1); + assert_eq!(orchestrator.bel.state.wants_schedulability().0.len(), 1); orchestrator .poll_wants() .expect("shouldn't fail to poll wants"); @@ -614,7 +621,9 @@ mod tests { // When // Poll wants then schedule pending jobs - orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed"); + orchestrator + .poll_wants() + .expect("stage unscheduled jobs based on wants failed"); assert_eq!(orchestrator.not_started_jobs.len(), 1); // poll job runs should start job run orchestrator.poll_job_runs().expect("should start run"); @@ -622,14 +631,18 @@ mod tests { assert_eq!(orchestrator.bel.state.count_job_runs(), 1); thread::sleep(Duration::from_millis(1)); // Should still be running after 1ms - orchestrator.poll_job_runs().expect("should still be running"); + orchestrator + .poll_job_runs() + .expect("should still be running"); assert_eq!(orchestrator.running_jobs.len(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1); println!("STATE: {:?}", orchestrator.bel.state); // Wait for it to complete thread::sleep(Duration::from_millis(10)); - orchestrator.poll_job_runs().expect("should be able to poll existing job run"); + orchestrator + .poll_job_runs() + .expect("should be able to poll existing job run"); // Job run should have succeeded assert!(orchestrator.not_started_jobs.is_empty()); diff --git a/docs/ideas/metadata.md b/docs/ideas/metadata.md index 4023419..b26f78a 100644 --- a/docs/ideas/metadata.md +++ b/docs/ideas/metadata.md @@ -1,2 +1,9 @@ It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level. + +Also, taints could be implemented as metadata also, e.g. a `databuild.tainted_at` field that is just set to the current time upon tainting a partition. This would involve a few endpoints: + +1. Set partition metadata +2. Get partition metadata + +Big question is, do we need taint history? Or metadata assignment history? Temptation is YAGNI, but may be worth imagining here just to make sure. \ No newline at end of file