Compare commits

..

22 commits

Author SHA1 Message Date
2cf778a07b big bump
Some checks failed
/ setup (push) Has been cancelled
2025-11-16 22:21:56 -08:00
5361e295e0 impl test_job_fail_want_mapping 2025-10-20 09:04:22 -07:00
75ef722a2c impl test_schedulable_want_no_matching_job 2025-10-20 08:06:06 -07:00
bbeceaa015 add tests for want propagation, update dep read/miss reporting 2025-10-20 07:51:50 -07:00
1bca863be1 impl want propagation in orchestrator 2025-10-16 22:28:42 -07:00
3f223829bb impl dep miss in job run 2025-10-16 20:30:11 -07:00
aa2106ad8c impl dep miss state in job runs 2025-10-16 19:55:50 -07:00
2cd2ce7f7d impl log line parsing 2025-10-16 19:36:04 -07:00
9559a410d3 impl log line parsing 2025-10-16 19:33:18 -07:00
cb580f83eb lay groundwork for log line parsing 2025-10-16 19:27:42 -07:00
1f4138ecc0 little tweak to job run constructor 2025-10-16 18:53:51 -07:00
eeb90d0386 refactor top level job run interface to represent job states as states 2025-10-16 18:47:23 -07:00
6572d4e3bd refactor job run state to separate state types 2025-10-16 18:19:24 -07:00
cfcb201285 implement job canceling 2025-10-16 17:54:30 -07:00
7debea96a2 implement simple job succeeds and job fails tests for job run 2025-10-14 20:37:03 -07:00
fa5a5fa200 integrate test bin and get failing first job run test 2025-10-14 20:08:35 -07:00
d7fb2323d8 add more job run details 2025-10-14 19:54:03 -07:00
ea85af4d2b add test binary tests 2025-10-14 19:32:15 -07:00
bea9616227 add test binary 2025-10-13 19:49:48 -07:00
873f766aa0 add job run basics 2025-10-13 19:38:12 -07:00
bc61d8f530 partition schedulability 2025-10-13 13:29:04 -07:00
ac567240ea Add design notes for build state 2025-10-12 13:44:25 -07:00
20 changed files with 2351 additions and 515 deletions

View file

@ -3,15 +3,21 @@ module(
version = "0.1",
)
bazel_dep(name = "bazel_skylib", version = "1.8.1")
bazel_dep(name = "platforms", version = "0.0.11")
bazel_dep(name = "rules_shell", version = "0.4.0")
bazel_dep(name = "bazel_skylib", version = "1.8.2")
bazel_dep(name = "platforms", version = "1.0.0")
bazel_dep(name = "rules_shell", version = "0.6.1")
bazel_dep(name = "rules_oci", version = "2.2.6")
bazel_dep(name = "aspect_bazel_lib", version = "2.14.0")
bazel_dep(name = "rules_rust", version = "0.61.0")
bazel_dep(name = "rules_rust", version = "0.67.0")
bazel_dep(name = "rules_proto", version = "7.0.2")
bazel_dep(name = "protobuf", version = "29.0", repo_name = "com_google_protobuf")
#rust = use_extension("@rules_rust//rust:extensions.bzl", "rust")
#rust.toolchain(
# edition = "2024",
# versions = ["1.91.1"],
#)
crate = use_extension("@rules_rust//crate_universe:extensions.bzl", "crate")
crate.spec(
features = ["derive"],

File diff suppressed because one or more lines are too long

View file

@ -39,6 +39,8 @@ rust_library(
rust_test(
name = "databuild_test",
crate = ":databuild",
data = ["//databuild/test:test_job_helper"],
env = {"RUST_BACKTRACE": "1"},
)
# Legacy filegroup for backwards compatibility

View file

@ -6,18 +6,18 @@ use std::error::Error;
use std::fmt::Debug;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::build_state::BuildState;
use crate::util::current_timestamp;
use crate::util::{current_timestamp, DatabuildError};
pub trait BELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>>;
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MemoryBELStorage {
pub events: Vec<DataBuildEvent>,
}
@ -35,7 +35,7 @@ impl MemoryBELStorage {
}
impl BELStorage for MemoryBELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let timestamp = current_timestamp();
let dbe = DataBuildEvent {
timestamp,
@ -50,7 +50,7 @@ impl BELStorage for MemoryBELStorage {
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
Ok(self
.events
.iter()
@ -67,7 +67,7 @@ struct SqliteBELStorage {
}
impl SqliteBELStorage {
fn create(database_url: &str) -> Result<SqliteBELStorage, Box<dyn Error>> {
fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
let connection = Connection::open(database_url)?;
// Create the events table
@ -85,7 +85,7 @@ impl SqliteBELStorage {
}
impl BELStorage for SqliteBELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let now = SystemTime::now();
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
let timestamp = duration_since_epoch.as_nanos() as u64;
@ -113,7 +113,7 @@ impl BELStorage for SqliteBELStorage {
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
let mut stmt = self.connection.prepare(
"SELECT event_id, timestamp, event_data FROM events
WHERE timestamp > ?1
@ -153,7 +153,7 @@ impl BELStorage for SqliteBELStorage {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
@ -164,106 +164,108 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
BuildEventLog { storage, state }
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
self.state.handle_event(&event);
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.state.handle_event(&event)?;
let idx = self.storage.append_event(event)?;
Ok(idx)
}
}
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
todo!()
impl Clone for BuildEventLog<MemoryBELStorage> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
state: self.state.clone(),
}
}
}
mod tests {
use uuid::Uuid;
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::BuildState;
mod sqlite_bel_storage {
use uuid::Uuid;
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::BuildState;
#[test]
fn test_hello() {
assert_eq!(2 + 3, 5);
}
#[test]
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
#[test]
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
let want_id = "sqlite_test_1234".to_string();
let want_id = "sqlite_test_1234".to_string();
// Initial state - verify storage is empty
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0);
// Initial state - verify storage is empty
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0);
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).is_none());
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).is_none());
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
// Verify event was stored
assert!(event_id > 0);
// Verify event was stored
assert!(event_id > 0);
// Verify event can be retrieved
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1);
// Verify event can be retrieved
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1);
let stored_event = &events[0];
assert_eq!(stored_event.event_id, event_id);
assert!(stored_event.timestamp > 0);
let stored_event = &events[0];
assert_eq!(stored_event.event_id, event_id);
assert!(stored_event.timestamp > 0);
// Verify the event content
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
assert_eq!(want_event.want_id, want_id);
assert_eq!(want_event.partitions.len(), 1);
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
} else {
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
}
// Verify the event content
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
assert_eq!(want_event.want_id, want_id);
assert_eq!(want_event.partitions.len(), 1);
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
} else {
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
// Verify state was updated
assert!(
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
"want_id not equal in state",
);
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 4);
}
// Verify state was updated
assert!(
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
"want_id not equal in state",
);
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 4);
}
}

View file

@ -1,13 +1,44 @@
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::{Connection, ToSql};
use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::job_run::{DepMissJobRun, SubProcessBackend};
use crate::util::{current_timestamp, DatabuildError};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse,
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode,
TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1,
WantDetail, WantStatusCode,
};
use rusqlite::types::FromSql;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error;
#[derive(Debug)]
/**
Design Notes
The build state struct is the heart of the service and orchestrator, adapting build events to
higher level questions about build state. One temptation is to implement the build state as a set
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
of the whole state for read/query purposes. The whole state needs to be available to handle state
updates like "this is the list of currently active job runs" in response to a job run event. Put
simply, this isn't possible without introducing some locking of the whole state and mutable state
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
in which reducers query the state based on the received event, but that just increases complexity.
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
state mutably to all state update functionality, trusting that we know how to use it responsibly.
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
and updates, which is exceptionally fast.
*/
#[derive(Debug, Clone)]
pub struct BuildState {
wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>,
@ -27,22 +58,210 @@ impl Default for BuildState {
}
impl BuildState {
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}
pub fn handle_event(&mut self, event: &Event) -> () {
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
pub fn handle_event(&mut self, event: &Event) -> Result<(), DatabuildError> {
match event {
Event::WantCreateV1(e) => {
self.wants.insert(e.want_id.clone(), e.clone().into());
}
Event::WantCancelV1(e) => {
if let Some(want) = self.wants.get_mut(&e.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
// JobRun events
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e),
Event::JobRunFailureV1(e) => self.handle_job_run_failure(e),
Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e),
Event::JobRunSuccessV1(e) => self.handle_job_run_success(e),
Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e),
// Want events
Event::WantCreateV1(e) => self.handle_want_create(e),
Event::WantCancelV1(e) => self.handle_want_cancel(e),
// Taint events
Event::TaintCreateV1(e) => self.handle_taint_create(e),
Event::TaintDeleteV1(e) => self.handle_taint_delete(e),
// Ruh roh!
_ => panic!("Unhandled event type! {:?}", event),
}
}
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Result<(), DatabuildError> {
self.wants
.insert(event.want_id.clone(), event.clone().into());
Ok(())
}
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
}
Ok(())
}
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Result<(), DatabuildError> {
// No job run should exist
if self.job_runs.get(&event.job_run_id).is_some() {
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into())
} else {
// Create job run to be inserted
let job_run: JobRunDetail = event.clone().into();
for pref in job_run.building_partitions.iter() {
// Update all wants that point to this partition ref to `Building`
// Query notes: "update all wants that point to this partition to building"
if let Some(want) = self.wants.get_mut(&pref.r#ref) {
want.status = Some(WantStatusCode::WantBuilding.into());
}
}
_ => (),
self.job_runs.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(())
}
}
fn update_job_run_status(
&mut self,
job_run_id: &str,
status: JobRunStatusCode,
) -> Result<(), DatabuildError> {
if let Some(job_run) = self.job_runs.get_mut(job_run_id) {
job_run.last_heartbeat_at = Some(current_timestamp());
job_run.status = Some(status.into());
Ok(())
} else {
Err(format!("Job run ID {} not found", job_run_id).into())
}
}
fn update_partition_status(
&mut self,
pref: &PartitionRef,
status: PartitionStatusCode,
job_run_id: Option<&str>,
) -> Result<(), DatabuildError> {
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
partition.status = Some(status.clone().into());
partition.last_updated_timestamp = Some(current_timestamp());
if let Some(job_run_id) = job_run_id.map(str::to_string) {
if !partition.job_run_ids.contains(&job_run_id) {
partition.job_run_ids.push(job_run_id);
}
}
} else {
// Partition doesn't exist yet, needs to be inserted
let want_ids = if let Some(jrid) = job_run_id {
let job_run = self.get_job_run(jrid).expect("Job run must exist for partition");
job_run.servicing_wants.iter().map(|wap| wap.want_id.clone()).collect()
} else {
vec![]
};
let partition = PartitionDetail {
r#ref: Some(pref.clone()),
status: Some(status.into()),
last_updated_timestamp: Some(current_timestamp()),
job_run_ids: job_run_id.map(|jrid| vec![jrid.to_string()]).unwrap_or(vec![]),
want_ids,
..PartitionDetail::default()
};
self.partitions.insert(pref.r#ref.clone(), partition);
};
self.update_wants_for_partition(&pref)
}
/// Walks the state from this want ID to update its status.
fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> {
if let Some(want) = self.wants.get(want_id) {
let details: Vec<Option<PartitionDetail>> = want
.upstreams
.iter()
.map(|pref| self.get_partition(&pref.r#ref))
.collect();
let status: WantStatusCode = details.into();
if let Some(mut_want) = self.wants.get_mut(want_id) {
mut_want.status = Some(status.into());
mut_want.last_updated_timestamp = current_timestamp();
}
Ok(())
} else {
Err(format!("Want id {} not found", want_id).into())
}
}
fn handle_job_run_heartbeat(
&mut self,
event: &JobRunHeartbeatEventV1,
) -> Result<(), DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)
}
fn handle_job_run_success(
&mut self,
event: &JobRunSuccessEventV1,
) -> Result<(), DatabuildError> {
println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Update partitions being build by this job
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionLive, Some(&event.job_run_id))?;
}
Ok(())
}
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
// todo!("Go to every want that references this partition and update its status")
let want_ids = self
.partitions
.get(&pref.r#ref)
.map(|p| p.want_ids.clone())
.ok_or(format!("Partition for ref {} not found", pref.r#ref))?;
for want_id in want_ids.iter() {
self.update_want_status(want_id)?;
}
Ok(())
}
fn handle_job_run_failure(
&mut self,
event: &JobRunFailureEventV1,
) -> Result<(), DatabuildError> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?;
let job_run = self.get_job_run(&event.job_run_id).unwrap();
for pref in job_run.building_partitions {
self.update_partition_status(&pref, PartitionStatusCode::PartitionFailed, Some(&event.job_run_id))?;
}
Ok(())
}
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Result<(), DatabuildError> {
todo!("should update already inserted job run, partition status, want status")
}
fn handle_job_run_dep_miss(
&mut self,
event: &JobRunMissingDepsEventV1,
) -> Result<(), DatabuildError> {
todo!("should update already inserted job run, schedule wants...?")
}
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Result<(), DatabuildError> {
todo!("...?")
}
fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> {
todo!("...?")
}
fn with_wants(self, wants: BTreeMap<String, WantDetail>) -> Self {
Self { wants, ..self }
}
fn with_partitions(self, partitions: BTreeMap<String, PartitionDetail>) -> Self {
Self { partitions, ..self }
}
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).cloned()
}
@ -99,13 +318,133 @@ impl BuildState {
page_size,
}
}
/**
Wants are schedulable when their partition is live and not tainted
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
let live_details: Vec<&PartitionDetail> = want
.upstreams
.iter()
.map(|pref| self.partitions.get(&pref.r#ref))
.flatten()
.collect();
let live: Vec<PartitionRef> = live_details
.iter()
.map(|pd| pd.r#ref.clone().expect("pref must have ref"))
.collect();
let missing: Vec<PartitionRef> = want
.upstreams
.iter()
.filter(|pref| self.partitions.get(&pref.r#ref).is_none())
.cloned()
.collect();
let tainted: Vec<PartitionRef> = live_details
.iter()
.filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into()))
.map(|pref| pref.r#ref.clone().unwrap())
.collect();
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {
live,
tainted,
missing,
},
}
}
pub fn schedulable_wants(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
// Do not consider fulfilled or currently building wants in schedulability query
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantSuccessful as i32
})
.filter(|w| {
w.status.clone().expect("want must have status").code
!= WantStatusCode::WantBuilding as i32
})
.cloned()
.map(|w| self.want_schedulability(&w))
.collect(),
)
}
/// Maps a dep miss into the BEL events it implies, so that the job can be run successfully later
pub fn dep_miss_to_events(
&self,
dep_miss: &DepMissJobRun<SubProcessBackend>,
) -> Result<Vec<Event>, DatabuildError> {
let mut events = vec![];
// Append literal job run dep miss
events.push(dep_miss.state.to_event(&dep_miss.id()));
// Append wants from dep miss
let job_run_detail = self
.get_job_run(&dep_miss.job_run_id.to_string())
.ok_or(format!(
"Unable to find job run with id `{}`",
dep_miss.job_run_id
))?;
// Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
dep_miss.state.missing_deps.clone(),
&dep_miss.job_run_id,
want_timestamps,
);
events.extend(want_events);
Ok(events)
}
}
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<PartitionRef>,
pub tainted: Vec<PartitionRef>,
pub missing: Vec<PartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantSchedulability {
pub want: WantDetail,
pub status: WantUpstreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl Into<bool> for WantsSchedulability {
fn into(self) -> bool {
self.0.iter().all(|w| w.is_schedulable())
}
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
self.status.missing.is_empty() && self.status.tainted.is_empty()
}
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values().skip(start as usize).take(end as usize).cloned().collect()
map.values()
.skip(start as usize)
.take(end as usize)
.cloned()
.collect()
}
mod consts {
@ -114,6 +453,106 @@ mod consts {
#[cfg(test)]
mod tests {
mod schedulable_wants {
use crate::build_state::BuildState;
use crate::{
PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantDetail,
WantStatus, WantStatusCode,
};
use std::collections::BTreeMap;
impl WantDetail {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
Self { upstreams, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
}
impl PartitionDetail {
fn with_status(self, status: Option<PartitionStatus>) -> Self {
Self { status, ..self }
}
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
Self { r#ref, ..self }
}
}
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().schedulable_wants().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
#[test]
fn test_simple_want_with_live_upstream_is_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default().with_ref(Some(test_partition.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
#[test]
fn test_simple_want_without_live_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default().with_wants(BTreeMap::from([(
test_partition.to_string(),
WantDetail::default()
.with_upstreams(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(!ws.is_schedulable());
}
#[test]
#[ignore]
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default()
.with_ref(Some(test_partition.into()))
.with_status(Some(PartitionStatusCode::PartitionTainted.into())),
)]));
// Should...
let schedulability = state.schedulable_wants();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}
mod sqlite_build_state {
mod want {
use crate::build_state::BuildState;

243
databuild/data_deps.rs Normal file
View file

@ -0,0 +1,243 @@
use crate::data_build_event::Event;
use crate::{
JobRunMissingDeps, JobRunReadDeps, JobTriggeredEvent, MissingDeps, ReadDeps, WantCreateEventV1,
WantDetail,
};
use uuid::Uuid;
// TODO - how do we version this?
pub const DATABUILD_MISSING_DEPS_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub const DATABUILD_DEP_READ_JSON: &str = "DATABUILD_DEP_READ_JSON:";
pub enum DataDepLogLine {
DepMiss(JobRunMissingDeps),
DepRead(JobRunReadDeps),
}
impl From<DataDepLogLine> for String {
fn from(value: DataDepLogLine) -> Self {
match value {
DataDepLogLine::DepMiss(dm) => {
format!(
"{}{}",
DATABUILD_MISSING_DEPS_JSON,
serde_json::to_string(&dm).expect("json serialize")
)
}
DataDepLogLine::DepRead(dr) => {
format!(
"{}{}",
DATABUILD_DEP_READ_JSON,
serde_json::to_string(&dr).expect("json serialize")
)
}
}
}
}
#[derive(Default, Debug)]
pub struct JobRunDataDepResults {
pub reads: Vec<ReadDeps>,
pub misses: Vec<MissingDeps>,
}
impl JobRunDataDepResults {
pub fn with(mut self, dep_log_line: DataDepLogLine) -> Self {
match dep_log_line {
DataDepLogLine::DepMiss(dm) => self.misses.extend(dm.missing_deps),
DataDepLogLine::DepRead(rd) => self.reads.extend(rd.read_deps),
}
self
}
pub fn with_lines(mut self, lines: Vec<String>) -> Self {
lines
.iter()
.flat_map(|line| parse_log_line(line))
.fold(self, |agg, it| agg.with(it))
}
}
impl Into<JobRunDataDepResults> for Vec<String> {
fn into(self) -> JobRunDataDepResults {
JobRunDataDepResults::default().with_lines(self)
}
}
pub fn parse_log_line(line: &str) -> Option<DataDepLogLine> {
if let Some(message) = line_matches(line, DATABUILD_MISSING_DEPS_JSON) {
serde_json::from_str(message)
.ok()
.map(|dm| DataDepLogLine::DepMiss(dm))
} else if let Some(message) = line_matches(line, DATABUILD_DEP_READ_JSON) {
serde_json::from_str(message)
.ok()
.map(|dm| DataDepLogLine::DepRead(dm))
} else {
None
}
}
fn line_matches<'a>(line: &'a str, prefix: &'a str) -> Option<&'a str> {
line.trim().strip_prefix(prefix)
}
pub struct WantTimestamps {
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
}
impl From<WantDetail> for WantTimestamps {
fn from(want_detail: WantDetail) -> Self {
WantTimestamps {
data_timestamp: want_detail.data_timestamp,
ttl_seconds: want_detail.ttl_seconds,
sla_seconds: want_detail.sla_seconds,
}
}
}
impl WantTimestamps {
pub fn merge(self, other: WantTimestamps) -> WantTimestamps {
// TODO does this make sense?
WantTimestamps {
data_timestamp: self.data_timestamp.min(other.data_timestamp),
ttl_seconds: self.ttl_seconds.max(other.ttl_seconds),
sla_seconds: self.sla_seconds.max(other.sla_seconds),
}
}
}
pub fn missing_deps_to_want_events(
missing_deps: Vec<MissingDeps>,
job_run_id: &Uuid,
want_timestamps: WantTimestamps,
) -> Vec<Event> {
missing_deps
.iter()
.map(|md| {
Event::WantCreateV1(WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: md.missing.clone(),
data_timestamp: want_timestamps.data_timestamp,
ttl_seconds: want_timestamps.ttl_seconds,
sla_seconds: want_timestamps.sla_seconds,
source: Some(
JobTriggeredEvent {
job_run_id: job_run_id.to_string(),
}
.into(),
),
comment: Some("Missing data".to_string()),
})
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_missing_deps_with_1_to_1_and_1_to_n() {
let log_line = r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string();
let result = parse_log_line(&log_line);
assert!(result.is_some());
let missing_deps = match result.unwrap() {
DataDepLogLine::DepMiss(md) => md,
_ => panic!("expected dep miss log line"),
};
assert_eq!(missing_deps.missing_deps.len(), 2);
// First entry: 1:1 (one missing input -> one impacted output)
assert_eq!(missing_deps.missing_deps[0].impacted.len(), 1);
assert_eq!(missing_deps.missing_deps[0].impacted[0].r#ref, "output/p1");
assert_eq!(missing_deps.missing_deps[0].missing.len(), 1);
assert_eq!(missing_deps.missing_deps[0].missing[0].r#ref, "input/p1");
// Second entry: 1:N (one missing input -> multiple impacted outputs)
assert_eq!(missing_deps.missing_deps[1].impacted.len(), 2);
assert_eq!(missing_deps.missing_deps[1].impacted[0].r#ref, "output/p2");
assert_eq!(missing_deps.missing_deps[1].impacted[1].r#ref, "output/p3");
assert_eq!(missing_deps.missing_deps[1].missing.len(), 1);
assert_eq!(missing_deps.missing_deps[1].missing[0].r#ref, "input/p2");
}
/// We can accumulate dep miss and read events
#[test]
fn test_accumulate_dep_parse_and_miss() {
// Given
let r = JobRunDataDepResults::default();
assert_eq!(r.misses.len(), 0);
assert_eq!(r.reads.len(), 0);
// When
let r = r
.with(DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p1".into()],
read: vec!["input/p1".into()],
}],
}))
.with(DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p2".into()],
read: vec!["input/p2".into(), "input/p2".into()],
}],
}))
.with(DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["output/p3".into()],
missing: vec!["input/p3".into()],
}],
}));
}
/// It's acceptable to print separately for each missing dep
#[test]
fn test_parse_multiple_missing_deps() {
// Given
let r = JobRunDataDepResults::default();
let stdout_lines: Vec<String> = vec![
"something".into(),
DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p1".into()],
read: vec!["input/p1".into()],
}],
})
.into(),
DataDepLogLine::DepRead(JobRunReadDeps {
version: "1".into(),
read_deps: vec![ReadDeps {
impacted: vec!["output/p2".into()],
read: vec!["input/p2".into()],
}],
})
.into(),
"something else".into(),
DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["output/p3".into()],
missing: vec!["input/p3".into()],
}],
})
.into(),
];
// When
let results = r.with_lines(stdout_lines);
// Should
assert_eq!(results.misses.len(), 1);
assert_eq!(results.reads.len(), 2);
}
}

View file

@ -53,7 +53,8 @@ message WantAttributedPartitions {
message JobRunBufferEventV1 {
string job_run_id = 1;
string job_label = 2;
repeated WantAttributedPartitions want_attributed_partitions = 3;
repeated PartitionRef building_partitions = 3;
repeated WantAttributedPartitions want_attributed_partitions = 4;
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
}
// Just indicates that job has entered queue
@ -73,6 +74,7 @@ message JobRunSuccessEventV1 {
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
message JobRunFailureEventV1 {
string job_run_id = 1;
string reason = 2;
}
// Job was explicitly canceled.
message JobRunCancelEventV1 {
@ -84,12 +86,30 @@ message JobRunCancelEventV1 {
message JobRunMissingDepsEventV1 {
string job_run_id = 1;
repeated MissingDeps missing_deps = 2;
repeated ReadDeps read_deps = 3;
}
message JobRunReadDepsEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2;
}
message JobRunMissingDeps {
string version = 1;
repeated MissingDeps missing_deps = 2;
}
message MissingDeps {
// The list of partition refs that are prevented from building by these missing deps (can be just 1)
repeated PartitionRef impacted = 1;
repeated PartitionRef missing = 2;
}
message JobRunReadDeps {
string version = 1;
repeated ReadDeps read_deps = 2;
}
message ReadDeps {
// The list of partition refs that are built using the read deps (can be just 1)
repeated PartitionRef impacted = 1;
repeated PartitionRef read = 2;
}
message WantCreateEventV1 {
@ -147,14 +167,17 @@ enum WantStatusCode {
message WantDetail {
string want_id = 1;
// The partitions directly wanted by this want
repeated PartitionRef partitions = 2;
uint64 data_timestamp = 3;
uint64 ttl_seconds = 4;
uint64 sla_seconds = 5;
EventSource source = 6;
optional string comment = 7;
WantStatus status = 8;
uint64 last_updated_timestamp = 9;
// The upstream partitions, detected from a dep miss job run failure
repeated PartitionRef upstreams = 3;
uint64 data_timestamp = 4;
uint64 ttl_seconds = 5;
uint64 sla_seconds = 6;
EventSource source = 7;
optional string comment = 8;
WantStatus status = 9;
uint64 last_updated_timestamp = 10;
// TODO
}
@ -167,6 +190,7 @@ message PartitionDetail {
optional uint64 last_updated_timestamp = 3;
// IDs that associate the partition with other objects
repeated string job_run_ids = 4;
// Wants that reference this partition
repeated string want_ids = 5;
repeated string taint_ids = 6;
}
@ -188,9 +212,25 @@ message TaintDetail {
// TODO
}
message JobRunDetail {
// TODO
message JobRunStatus {
JobRunStatusCode code = 1;
string name = 2;
}
enum JobRunStatusCode {
JobRunQueued = 0;
JobRunRunning = 1;
JobRunFailed = 2;
JobRunCanceled = 3;
JobRunSucceeded = 4;
}
message JobRunDetail {
string id = 1;
JobRunStatus status = 2;
optional uint64 last_heartbeat_at = 3;
repeated PartitionRef building_partitions = 4;
repeated WantAttributedPartitions servicing_wants = 5;
}
message EventFilter {

View file

@ -1,6 +1,7 @@
use crate::util::current_timestamp;
use crate::{PartitionRef, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
impl From<&WantCreateEventV1> for WantDetail {
fn from(e: &WantCreateEventV1) -> Self {
@ -12,12 +13,13 @@ impl From<WantCreateEventV1> for WantDetail {
WantDetail {
want_id: e.want_id,
partitions: e.partitions,
upstreams: vec![],
data_timestamp: e.data_timestamp,
ttl_seconds: e.ttl_seconds,
sla_seconds: e.sla_seconds,
source: e.source,
comment: e.comment,
status: Default::default(),
status: Some(Default::default()),
last_updated_timestamp: current_timestamp(),
}
}
@ -33,6 +35,14 @@ impl From<WantCancelEventV1> for Event {
}
}
impl From<WantCreateEventV1> for WantAttributedPartitions {
fn from(value: WantCreateEventV1) -> Self {
Self {
want_id: value.want_id,
partitions: value.partitions,
}
}
}
impl From<WantStatusCode> for WantStatus {
fn from(code: WantStatusCode) -> Self {
@ -43,6 +53,50 @@ impl From<WantStatusCode> for WantStatus {
}
}
impl From<JobRunBufferEventV1> for JobRunDetail {
fn from(value: JobRunBufferEventV1) -> Self {
Self {
id: value.job_run_id,
status: Some(JobRunStatusCode::JobRunQueued.into()),
last_heartbeat_at: None,
building_partitions: value.building_partitions,
servicing_wants: value.want_attributed_partitions,
}
}
}
pub fn want_status_matches_any(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
pds.iter()
.any(|pd| pd.clone()
.map(|pd| pd.status == Some(status.into()))
.unwrap_or(false))
}
pub fn want_status_matches_all(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
pds.iter()
.all(|pd| pd.clone()
.map(|pd| pd.status == Some(status.into()))
.unwrap_or(false))
}
/// Merges a list of partition details into a single status code.
/// Takes the lowest state as the want status.
impl Into<WantStatusCode> for Vec<Option<PartitionDetail>> {
fn into(self) -> WantStatusCode {
if want_status_matches_any(&self, PartitionFailed) {
WantStatusCode::WantFailed
} else if want_status_matches_all(&self, PartitionLive) {
WantStatusCode::WantSuccessful
} else if self.iter().any(|pd| pd.is_none()) {
WantStatusCode::WantBuilding
} else {
WantStatusCode::WantIdle
}
}
}
impl From<&str> for PartitionRef {
fn from(value: &str) -> Self {
Self {
@ -50,3 +104,46 @@ impl From<&str> for PartitionRef {
}
}
}
impl From<PartitionStatusCode> for PartitionStatus {
fn from(code: PartitionStatusCode) -> Self {
PartitionStatus {
code: code.into(),
name: code.as_str_name().to_string(),
}
}
}
impl From<JobRunStatusCode> for JobRunStatus {
fn from(code: JobRunStatusCode) -> Self {
JobRunStatus {
code: code.into(),
name: code.as_str_name().to_string(),
}
}
}
impl From<ManuallyTriggeredEvent> for EventSource {
fn from(value: ManuallyTriggeredEvent) -> Self {
Self {
source: Some(event_source::Source::ManuallyTriggered(value)),
}
}
}
impl From<JobTriggeredEvent> for EventSource {
fn from(value: JobTriggeredEvent) -> Self {
Self {
source: Some(event_source::Source::JobTriggered(value)),
}
}
}
impl From<&WantDetail> for WantAttributedPartitions {
fn from(value: &WantDetail) -> Self {
Self {
want_id: value.want_id.clone(),
partitions: value.partitions.clone(),
}
}
}

View file

@ -1,22 +1,21 @@
use crate::job_run::JobRun;
use crate::job_run::{NotStartedJobRun, SubProcessBackend};
use crate::{PartitionRef, WantDetail};
use regex::Regex;
use std::error::Error;
#[derive(Debug, Clone)]
pub struct JobConfiguration {
pub label: String,
pub pattern: String,
pub entrypoint: String,
pub entry_point: String,
}
impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
JobRun::spawn(self.entrypoint.clone(), args)
Ok(NotStartedJobRun::spawn(self.entry_point.clone(), args))
}
pub fn matches(&self, refs: &PartitionRef) -> bool {

View file

@ -1,164 +1,542 @@
use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
use crate::data_deps::JobRunDataDepResults;
use crate::{
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus,
JobRunSuccessEventV1, MissingDeps, ReadDeps,
};
use crate::util::DatabuildError;
use std::collections::HashMap;
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::marker::PhantomData;
use std::process::{Child, Command, Stdio};
use uuid::Uuid;
// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs)
// Leave door open to background log processor that tails job logs, but don't include in jobrun concept
/** Wrapper type that can be mocked */
trait JobRunChild {
fn exit_status(&mut self) -> Option<ExitStatus>;
fn stdout_lines(&mut self) -> Vec<String>;
}
/// Backend trait that defines the state types and transition logic for different job run implementations
pub trait JobRunBackend: Sized {
type NotStartedState;
type RunningState;
type CompletedState;
type FailedState;
type CanceledState;
type DepMissState;
#[derive(Debug)]
struct WrappedProcessChild(Child);
/// Create a new not-started job run
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState;
impl JobRunChild for WrappedProcessChild {
fn exit_status(&mut self) -> Option<ExitStatus> {
self.0.try_wait().expect("coudn't wait")
/// Convenience method to spawn a new job run (calls create and wraps in JobRun)
fn spawn(entry_point: String, args: Vec<String>) -> NotStartedJobRun<Self> {
NotStartedJobRun::spawn(entry_point, args)
}
fn stdout_lines(&mut self) -> Vec<String> {
let mut stdout_lines = Vec::new();
let stdout = self.0.stdout.take().expect("stdout not piped");
let reader = BufReader::new(stdout);
for line in reader.lines() {
stdout_lines.push(line.expect("stdout not piped"));
/// Transition from NotStarted to Running
fn start(
not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>,
) -> Result<Self::RunningState, DatabuildError>;
/// Poll a running job for state changes
fn poll(
running: &mut Self::RunningState,
) -> Result<
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
DatabuildError,
>;
/// Cancel a running job
fn cancel_job(
running: Self::RunningState,
source: EventSource,
) -> Result<Self::CanceledState, DatabuildError>;
}
/// Result of polling a running job
pub enum PollResult<C, F, D> {
StillRunning,
Completed(C),
Failed(F),
DepMiss(D),
}
/// Generic JobRun that works with any backend, parameterized by state
pub struct JobRun<B: JobRunBackend, S> {
pub job_run_id: Uuid,
pub state: S,
pub _backend: PhantomData<B>,
}
/// Type aliases for specific states
pub type NotStartedJobRun<B> = JobRun<B, <B as JobRunBackend>::NotStartedState>;
pub type RunningJobRun<B> = JobRun<B, <B as JobRunBackend>::RunningState>;
pub type CompletedJobRun<B> = JobRun<B, <B as JobRunBackend>::CompletedState>;
pub type FailedJobRun<B> = JobRun<B, <B as JobRunBackend>::FailedState>;
pub type CanceledJobRun<B> = JobRun<B, <B as JobRunBackend>::CanceledState>;
pub type DepMissJobRun<B> = JobRun<B, <B as JobRunBackend>::DepMissState>;
// Methods available on all JobRun states
impl<B: JobRunBackend, S> JobRun<B, S> {
pub fn id(&self) -> Uuid {
self.job_run_id
}
}
// Methods available only on NotStarted state
impl<B: JobRunBackend> NotStartedJobRun<B> {
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
JobRun {
job_run_id: Uuid::new_v4(),
state: B::create(entry_point, args),
_backend: PhantomData,
}
stdout_lines
}
}
impl From<Child> for WrappedProcessChild {
fn from(child: Child) -> Self {
Self { 0: child }
pub fn run(self) -> Result<RunningJobRun<B>, DatabuildError> {
self.run_with_env(None)
}
}
pub struct JobRun {
job_run_id: Uuid,
events: MemoryBELStorage,
child: Box<dyn JobRunChild>,
unhandled_lines: Vec<String>,
}
const EVENT_SIZE_LIMIT: u64 = 1000000;
impl JobRun {
pub fn spawn(command: String, args: Vec<String>) -> Result<JobRun, Box<dyn Error>> {
pub fn run_with_env(
self,
env: Option<HashMap<String, String>>,
) -> Result<RunningJobRun<B>, DatabuildError> {
let running_state = B::start(self.state, env)?;
Ok(JobRun {
job_run_id: Default::default(),
events: Default::default(),
child: Box::new(WrappedProcessChild::from(
Command::new(command)
.args(args)
.stdout(Stdio::piped())
.spawn()?,
)),
unhandled_lines: Default::default(),
job_run_id: self.job_run_id,
state: running_state,
_backend: PhantomData,
})
}
}
// Methods available only on Running state
impl<B: JobRunBackend> RunningJobRun<B> {
pub fn visit(&mut self) -> Result<JobRunVisitResult<B>, DatabuildError> {
match B::poll(&mut self.state)? {
PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning),
PollResult::Completed(completed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Completed(JobRun {
job_run_id,
state: completed_state,
_backend: PhantomData,
}))
}
PollResult::Failed(failed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Failed(JobRun {
job_run_id,
state: failed_state,
_backend: PhantomData,
}))
}
PollResult::DepMiss(state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::DepMiss(JobRun {
job_run_id,
state,
_backend: PhantomData,
}))
}
}
}
pub fn cancel(self, source: EventSource) -> Result<CanceledJobRun<B>, DatabuildError> {
let canceled_state = B::cancel_job(self.state, source)?;
Ok(JobRun {
job_run_id: self.job_run_id,
state: canceled_state,
_backend: PhantomData,
})
}
}
/// Result of visiting a running job
pub enum JobRunVisitResult<B: JobRunBackend> {
StillRunning,
Completed(CompletedJobRun<B>),
Failed(FailedJobRun<B>),
DepMiss(DepMissJobRun<B>),
}
pub enum JobRunConfig {
SubProcess {
entry_point: String,
args: Vec<String>,
},
}
// ===== SubProcess Backend Implementation =====
/// SubProcess backend for running jobs as local subprocesses
pub struct SubProcessBackend;
/// NotStarted state for SubProcess backend
pub struct SubProcessNotStarted {
pub entry_point: String,
pub args: Vec<String>,
}
/// Running state for SubProcess backend
pub struct SubProcessRunning {
pub process: Child,
pub stdout_buffer: Vec<String>,
}
/// Completed state for SubProcess backend
pub struct SubProcessCompleted {
pub exit_code: i32,
pub stdout_buffer: Vec<String>,
pub read_deps: Vec<ReadDeps>,
}
/// Failed state for SubProcess backend
pub struct SubProcessFailed {
pub exit_code: i32,
pub reason: String,
pub stdout_buffer: Vec<String>,
}
/// Canceled state for SubProcess backend
pub struct SubProcessCanceled {
pub source: EventSource,
pub stdout_buffer: Vec<String>,
}
pub struct SubProcessDepMiss {
pub stdout_buffer: Vec<String>,
pub missing_deps: Vec<MissingDeps>,
pub read_deps: Vec<ReadDeps>,
}
impl JobRunBackend for SubProcessBackend {
type NotStartedState = SubProcessNotStarted;
type RunningState = SubProcessRunning;
type CompletedState = SubProcessCompleted;
type FailedState = SubProcessFailed;
type CanceledState = SubProcessCanceled;
type DepMissState = SubProcessDepMiss;
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState {
SubProcessNotStarted { entry_point, args }
}
fn start(
not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>,
) -> Result<Self::RunningState, DatabuildError> {
let process = Command::new(not_started.entry_point)
.args(not_started.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(env.unwrap_or_default())
.spawn()?;
Ok(SubProcessRunning {
process,
stdout_buffer: Vec::new(),
})
}
pub fn visit(&mut self, since_idx: u64) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
// Collect new lines from child process
// Parse BEL events from child process
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
for event in new_events {
self.events.append_event(&event)?;
}
self.unhandled_lines.drain(..);
// Potentially react to job completion
match self.exit_status() {
None => {} // No exit -> no harm
Some(status) => {
if status.success() {
self.events
.append_event(&JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
} else {
self.events
.append_event(&JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
fn poll(
running: &mut Self::RunningState,
) -> Result<
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
DatabuildError,
> {
// Non-blocking check for exit status
if let Some(exit_status) = running.process.try_wait()? {
// Job has exited
// Read any remaining stdout
if let Some(stdout) = running.process.stdout.take() {
let reader = BufReader::new(stdout);
for line in reader.lines() {
// TODO we should write lines to the job's file logs
if let Ok(line) = line {
running.stdout_buffer.push(line);
}
}
}
}
// Return BEL events since provided idx
self.events
.list_events(since_idx, EVENT_SIZE_LIMIT)
.and_then(|events| {
if events.len() as u64 == EVENT_SIZE_LIMIT {
Err(format!(
"Returned {} events - that's way too many.",
EVENT_SIZE_LIMIT
)
.into())
} else {
Ok(events)
// Take ownership of stdout_buffer, parse dep events
let stdout_buffer = std::mem::take(&mut running.stdout_buffer);
let deps: JobRunDataDepResults = stdout_buffer.clone().into();
// Check exit status and return appropriate result
match exit_status.code() {
Some(0) => {
// Success case
Ok(PollResult::Completed(SubProcessCompleted {
exit_code: 0,
stdout_buffer,
read_deps: deps.reads,
}))
}
})
}
pub fn cancel(&mut self) {
todo!()
}
pub fn event_for_line(line: String) -> Option<Event> {
// TODO parse missing data dep event
// TODO parse job state
None
}
pub fn process_lines(job_run_id: Uuid, lines: &Vec<String>) -> Vec<Event> {
let mut events: Vec<Event> = Default::default();
if lines.len() > 0 {
// If any lines were written to stdout, we should heartbeat
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.clone().into(),
}));
Some(code) => {
// Failed with exit code
match deps.misses {
vec if vec.is_empty() => {
// No missing deps, job failed
let reason = format!("Job failed with exit code {}", code);
Ok(PollResult::Failed(SubProcessFailed {
exit_code: code,
reason,
stdout_buffer,
}))
}
misses => Ok(PollResult::DepMiss(SubProcessDepMiss {
stdout_buffer,
missing_deps: misses,
read_deps: deps.reads,
})),
}
}
None => {
// Terminated by signal (Unix) - treat as failure
let reason = format!("Job terminated by signal: {}", exit_status);
Ok(PollResult::Failed(SubProcessFailed {
exit_code: -1,
reason,
stdout_buffer,
}))
}
}
} else {
// Still running
Ok(PollResult::StillRunning)
}
}
for event in lines
.iter()
.flat_map(|line| Self::event_for_line(line.clone()))
{
events.push(event);
fn cancel_job(
mut running: Self::RunningState,
source: EventSource,
) -> Result<Self::CanceledState, DatabuildError> {
// Kill the process
running.process.kill()?;
// Wait for it to actually terminate
running.process.wait()?;
// Return canceled state
Ok(SubProcessCanceled {
source,
stdout_buffer: running.stdout_buffer,
})
}
}
// Helper functions to convert between states and events
impl SubProcessCompleted {
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
})
}
}
impl SubProcessFailed {
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: job_run_id.to_string(),
reason: self.reason.clone(),
})
}
}
impl SubProcessCanceled {
pub fn to_event(&self, job_run_id: &Uuid) -> JobRunCancelEventV1 {
JobRunCancelEventV1 {
job_run_id: job_run_id.to_string(),
source: Some(self.source.clone()),
comment: Some("Job was canceled".to_string()),
}
events
}
}
pub fn exit_status(&mut self) -> Option<ExitStatus> {
self.child.exit_status()
impl SubProcessDepMiss {
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
job_run_id: job_run_id.to_string(),
missing_deps: self.missing_deps.clone(),
read_deps: self.read_deps.clone(),
})
}
}
// Old JobRunPollResult structure - kept for compatibility during migration
pub struct JobRunPollResult {
pub new_events: Vec<Event>,
pub status: JobRunStatus,
}
mod tests {
use crate::job_run::JobRun;
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend};
use crate::mock_job_run::MockJobRun;
use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps};
/// Happy path - run that succeeds should emit a JobRunSuccessEventV1
#[test]
fn test_process_lines_empty() {
let lines = Vec::<String>::new();
let events = JobRun::process_lines(Default::default(), &lines);
assert_eq!(events.len(), 0);
fn test_job_run_success_returns_job_run_success_event() {
// Spawn a job run that will succeed (exit code 0)
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
// Start the job - this consumes the NotStarted and returns Running
let mut running_job = job_run.run().unwrap();
// Poll until we get completion
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(completed) => {
// Generate the event from the completed state
let event = completed.state.to_event(&completed.id());
assert!(matches!(event, Event::JobRunSuccessV1(_)));
break;
}
JobRunVisitResult::Failed(failed) => {
panic!("Job failed unexpectedly: {}", failed.state.reason);
}
JobRunVisitResult::StillRunning => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
JobRunVisitResult::DepMiss(dep_miss) => {
panic!("Job dep miss unexpectedly");
}
}
}
}
/// Job run that fails should emit a JobRunFailureEventV1
#[test]
fn test_process_lines_heartbeat() {
let lines_1 = vec!["Hello, salem".to_string()];
let events_1 = JobRun::process_lines(Default::default(), &lines_1);
assert_eq!(events_1.len(), 1);
fn test_job_run_failure_returns_job_run_failure_event() {
// Spawn a job run
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()];
let events_2 = JobRun::process_lines(Default::default(), &lines_2);
assert_eq!(events_2.len(), 1);
// Start the job with an exit code that indicates failure (non-zero)
let env = MockJobRun::new().exit_code(1).to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
panic!("Job succeeded unexpectedly");
}
JobRunVisitResult::Failed(failed) => {
// Generate the event from the failed state
let event = failed.state.to_event(&failed.id());
assert!(matches!(event, Event::JobRunFailureV1(_)));
break;
}
JobRunVisitResult::StillRunning => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
JobRunVisitResult::DepMiss(dep_miss) => {
panic!("Job dep miss unexpectedly");
}
}
}
}
/// Canceling an event before it completes should result in it:
/// - Stop the actual subprocess (e.g. no output file should be written)
/// - Emitting a JobRunCancelEventV1 event
#[test]
fn test_job_run_cancel_returns_job_run_cancel_event() {
use crate::ManuallyTriggeredEvent;
use std::fs;
use uuid::Uuid;
// Create a temp file path for the test
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let env = MockJobRun::new()
.sleep_ms(1000)
.output_file(&temp_file, &"completed".to_string())
.exit_code(0)
.to_env();
let running_job = job_run.run_with_env(Some(env)).unwrap();
// Give it a tiny bit of time to start
std::thread::sleep(std::time::Duration::from_millis(10));
// Cancel the job before it can complete - this consumes the running job and returns canceled
let canceled_job = running_job
.cancel(
ManuallyTriggeredEvent {
user: "test_user".into(),
}
.into(),
)
.unwrap();
// Generate the cancel event from the canceled state
let cancel_event = canceled_job.state.to_event(&canceled_job.id());
// Verify we got the cancel event
assert_eq!(cancel_event.job_run_id, canceled_job.id().to_string());
assert!(cancel_event.source.is_some());
assert_eq!(cancel_event.comment, Some("Job was canceled".to_string()));
// Verify the output file was NOT written (process was killed before it could complete)
assert!(
!std::path::Path::new(&temp_file).exists(),
"Output file should not exist - process should have been killed"
);
// Cleanup just in case
let _ = fs::remove_file(&temp_file);
}
/// Job run that fails and emits a recognized "dep miss" statement should emit a JobRunMissingDepsEventV1
#[test]
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let expected_dep_miss = JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["my_fav_output".into()],
missing: vec!["cool_input_1".into(), "cool_input_2".into()],
}],
};
let dep_miss_json =
serde_json::to_string(&expected_dep_miss).expect("Failed to serialize dep miss");
let dep_miss_line = format!("{}{}", DATABUILD_MISSING_DEPS_JSON, dep_miss_json);
let env = MockJobRun::new()
.stdout_msg(&dep_miss_line)
.exit_code(1)
.to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
panic!("Job succeeded unexpectedly");
}
JobRunVisitResult::Failed(failed) => {
panic!("Job failed unexpectedly");
}
JobRunVisitResult::StillRunning => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
JobRunVisitResult::DepMiss(backend) => {
assert_eq!(backend.state.missing_deps, expected_dep_miss.missing_deps);
break;
}
}
}
}
}

View file

@ -6,6 +6,8 @@ mod util;
mod build_state;
mod event_transforms;
mod event_defaults;
mod data_deps;
mod mock_job_run;
// Include generated protobuf code
include!("databuild.rs");

83
databuild/mock_job_run.rs Normal file
View file

@ -0,0 +1,83 @@
use std::collections::HashMap;
pub struct MockJobRun {
sleep_ms: u64,
stdout_msg: String,
output_file: Option<OutputFile>,
exit_code: u8,
}
pub struct OutputFile {
path: String,
contents: String,
}
impl Default for MockJobRun {
fn default() -> Self {
Self {
sleep_ms: 0,
stdout_msg: "test executed".to_string(),
output_file: None,
exit_code: 0,
}
}
}
impl MockJobRun {
pub fn new() -> Self {
Self::default()
}
pub fn sleep_ms(mut self, val: u64) -> Self {
self.sleep_ms = val;
self
}
pub fn stdout_msg(mut self, val: &String) -> Self {
self.stdout_msg = val.into();
self
}
pub fn output_file(mut self, path: &String, contents: &String) -> Self {
self.output_file = Some(OutputFile {
path: path.to_string(),
contents: contents.to_string(),
});
self
}
pub fn exit_code(mut self, val: u8) -> Self {
self.exit_code = val;
self
}
pub fn to_env(&self) -> HashMap<String, String> {
let mut env = HashMap::new();
env.insert(
"DATABUILD_TEST_SLEEP_MS".to_string(),
self.sleep_ms.to_string(),
);
env.insert(
"DATABUILD_TEST_EXIT_CODE".to_string(),
self.exit_code.to_string(),
);
env.insert("DATABUILD_TEST_STDOUT".to_string(), self.stdout_msg.clone());
if let Some(output_file) = &self.output_file {
env.insert(
"DATABUILD_TEST_OUTPUT_FILE".to_string(),
output_file.path.clone(),
);
env.insert(
"DATABUILD_TEST_OUTPUT_CONTENTS".to_string(),
output_file.contents.clone(),
);
}
env
}
pub fn bin_path() -> String {
std::env::var("TEST_SRCDIR")
.map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir))
.unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string())
}
}

View file

@ -1,41 +1,95 @@
use crate::build_event_log::{BELStorage, BuildEventLog};
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::JobRun;
use crate::{PartitionRef, WantDetail};
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
SubProcessBackend,
};
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use crate::util::DatabuildError;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
the visitor pattern to monitor job exec progress and liveness, and adds
the visitor pattern to monitor job exec progress and liveness.
JTBDs:
- Orchestrator turns job run dep miss failures into derivative wants for the missed partitions
- Orchestrator turns schedulable wants into job runs to build the requested partitions
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
jobs when possible
*/
struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S>,
job_runs: Vec<JobRunHandle>,
config: OrchestratorConfig,
pub bel: BuildEventLog<S>,
pub not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
pub running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
pub completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
pub failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
pub dep_miss_jobs: Vec<DepMissJobRun<SubProcessBackend>>,
pub config: OrchestratorConfig,
}
struct JobRunHandle {
job_run: JobRun,
bel_idx: u64,
}
impl From<JobRun> for JobRunHandle {
fn from(job_run: JobRun) -> Self {
impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self {
Self {
job_run,
bel_idx: 0,
bel: Default::default(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: Default::default(),
}
}
}
impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self {
Self {
bel: self.bel.clone(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: self.config.clone(),
}
}
}
impl<S: BELStorage + Debug> Orchestrator<S> {
fn with_config(self, config: OrchestratorConfig) -> Self {
Self { config, ..self }
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self {
config: self.config.with_jobs(jobs),
..self
}
}
fn with_bel(self, bel: BuildEventLog<S>) -> Self {
Self { bel, ..self }
}
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
fn default() -> Self {
Self {
jobs: Vec::default(),
}
}
}
impl OrchestratorConfig {
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.label == label).cloned()
@ -44,6 +98,14 @@ impl OrchestratorConfig {
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
self.jobs.iter().find(|job| job.matches(pref)).cloned()
}
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self { jobs }
}
fn with_job(self, job: JobConfiguration) -> Self {
Self { jobs: vec![job] }
}
}
#[derive(Debug, Clone)]
@ -52,6 +114,12 @@ struct WantGroup {
wants: Vec<WantDetail>,
}
impl WantGroup {
pub fn spawn(&self) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
self.job.spawn(self.wants.clone())
}
}
#[derive(Debug, Clone)]
struct GroupedWants {
want_groups: Vec<WantGroup>,
@ -62,54 +130,123 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
job_runs: Vec::new(),
not_started_jobs: Vec::new(),
running_jobs: Vec::new(),
completed_jobs: Vec::new(),
failed_jobs: Vec::new(),
dep_miss_jobs: Vec::new(),
config,
}
}
/** Continuously invoked function to watch job run status */
fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> {
// Visit existing jobs, remove completed
self.job_runs.retain_mut(|jr| {
// Append emitted events
let events = jr
.job_run
.visit(jr.bel_idx.clone())
.expect("Job visit failed");
events
.iter()
.filter_map(|event| event.event.clone())
.for_each(|event| {
self.bel
.append_event(&event)
.expect("Failed to append event");
});
fn job_runs_count(&self) -> usize {
self.not_started_jobs.len()
+ self.running_jobs.len()
+ self.completed_jobs.len()
+ self.failed_jobs.len()
+ self.dep_miss_jobs.len()
}
// Retain job run if it doesn't yet have an exit code (still running)
jr.job_run.exit_status().is_none()
});
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::JobRunVisitResult;
// Coherence check setup
let total_runs_count = self.job_runs_count();
// First, start any not-started jobs
while let Some(job) = self.not_started_jobs.pop() {
let running = job.run()?;
self.running_jobs.push(running);
}
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
// TODO make sure that failure in the middle can't mess up build state - likely need to
// refactor here (e.g. turn state changes into data, commit them after all have been
// calculated and validated)
for mut job in self.running_jobs.drain(..) {
match job.visit()? {
JobRunVisitResult::StillRunning => {
println!("Still running job: {:?}", job.id());
still_running.push(job);
}
JobRunVisitResult::Completed(completed) => {
// Emit success event
println!("Completed job: {:?}", completed.id());
let result = run_complete_to_events(&self.bel.state, &completed)?;
for event in result.events {
self.bel.append_event(&event)?;
}
// Move job to completed
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
// Emit failure event
println!("Failed job: {:?}", failed.id());
let event: Event = failed.state.to_event(&failed.id());
self.bel.append_event(&event)?;
self.failed_jobs.push(failed);
}
JobRunVisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
self.bel.append_event(&event)?;
}
// Record missing upstream status in want details
self.dep_miss_jobs.push(dep_miss);
}
}
}
self.running_jobs = still_running;
// Panic because this should never happen
assert_eq!(
self.job_runs_count(),
total_runs_count,
"Detected job run count change during job run visit (should never happen)"
);
Ok(())
}
/** Continuously invoked function to watch wants and schedule new jobs */
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
fn poll_wants(&mut self) -> Result<(), DatabuildError> {
// Collect unhandled wants, group by job that handles each partition,
let grouped_wants =
Orchestrator::<S>::group_wants(&self.config, &self.bel.schedulable_wants());
let schedulability = self.bel.state.schedulable_wants();
println!("schedulability: {:?}", schedulability);
let schedulable_wants = schedulability
.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
println!("grouped wants: {:?}", grouped_wants);
if !grouped_wants.want_groups.is_empty() {
if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future
Err(format!(
"Unable to map following wants: {:?}",
&grouped_wants.want_groups
&grouped_wants.unhandled_wants
)
.into())
} else {
// Spawn jobs and add events
for wg in grouped_wants.want_groups {
let job_run = wg.job.spawn(wg.wants)?;
self.job_runs.push(JobRunHandle::from(job_run));
let job_run = wg.spawn()?;
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_label: wg.job.label,
building_partitions: wg.wants.iter().map(|w| w.partitions.clone()).flatten().collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
}
Ok(())
@ -142,17 +279,103 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
fn step(&mut self) -> Result<(), DatabuildError> {
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())
}
/** Entrypoint for running jobs */
pub fn join(mut self) -> Result<(), Box<dyn Error>> {
pub fn join(&mut self) -> Result<(), DatabuildError> {
loop {
self.poll_job_runs()?;
self.poll_wants()?;
self.step()?
}
}
}
#[derive(Default, Clone, Debug)]
pub struct JobRunCompleteResult {
/// Events to append to the BEL from this job completing
pub events: Vec<Event>,
}
/// Handle successful run completion:
/// - Adding run success event
/// - Updating status for partitions actually built by the job
fn run_complete_to_events(
bel_state: &BuildState,
completed: &CompletedJobRun<SubProcessBackend>,
) -> Result<JobRunCompleteResult, DatabuildError> {
let mut events = vec![
// Event marking completion of job
completed.state.to_event(&completed.id()),
];
// let job_detail = bel_state
// .get_job_run(&completed.job_run_id.to_string())
// .ok_or(format!(
// "No job run found for id `{}`",
// completed.job_run_id
// ))?;
Ok(JobRunCompleteResult {
// built_partitions: job_detail.building_partitions,
events,
})
}
#[cfg(test)]
mod tests {
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use crate::WantCreateEventV1;
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
Orchestrator::default()
}
impl WantCreateEventV1 {
/// A naive random want for testing purposes
pub fn sample() -> Self {
Self {
want_id: Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
source: None,
comment: Some("test want".to_string()),
}
}
}
/// Scenario 1
/// A test scenario that simulates a databuild application with 2 jobs, alpha and beta, with
/// alpha depending on a single output from beta, and beta with no deps.
fn setup_scenario_a_to_b(
mut orchestrator: Orchestrator<MemoryBELStorage>,
) -> Orchestrator<MemoryBELStorage> {
// Define test jobs
orchestrator.config = OrchestratorConfig {
jobs: vec![
JobConfiguration {
label: "alpha".to_string(),
pattern: "data/alpha".to_string(),
entry_point: MockJobRun::bin_path(),
},
JobConfiguration {
label: "beta".to_string(),
pattern: "data/beta".to_string(),
entry_point: MockJobRun::bin_path(),
},
],
};
orchestrator
}
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
@ -189,48 +412,243 @@ mod tests {
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::WantCreateEventV1;
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
#[ignore]
fn test_empty_wants_noop() {
todo!()
let mut orchestrator = build_orchestrator();
// Should init with no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
orchestrator
.poll_wants()
.expect("shouldn't fail to poll empty wants");
// Should still be empty since no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
// (but in this case using a noop/mock child process)
#[test]
#[ignore]
fn test_schedulable_wants_should_schedule() {
todo!()
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 0);
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
assert_eq!(orchestrator.not_started_jobs.len(), 0);
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
// When
assert_eq!(orchestrator.bel.state.schedulable_wants().0.len(), 1);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should schedule alpha job
assert_eq!(orchestrator.not_started_jobs.len(), 1);
assert_eq!(
orchestrator
.not_started_jobs
.iter()
.take(1)
.last()
.unwrap()
.state
.args,
vec!["data/alpha"],
"should have scheduled alpha job"
);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
}
// Use case: A schedulable want that can't be matched to a job should return an error
#[test]
#[ignore]
fn test_schedulable_want_no_matching_job() {
todo!()
// Given
let mut orchestrator = build_orchestrator();
let events = vec![Event::WantCreateV1(WantCreateEventV1 {
partitions: vec!["data/alpha".into()],
..WantCreateEventV1::sample()
})];
// When
orchestrator
.poll_wants()
.expect("shouldn't fail to poll wants");
// Should not have scheduled any jobs
assert_eq!(orchestrator.not_started_jobs.len(), 0);
}
}
// Orchestrator want creation is the means of data dependency propagation, allowing the
// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
/// Orchestrator want creation is the means of data dependency propagation, allowing the
/// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
mod want_create {
// Use case: The orchestrator should map a failed job into a set of wants
use crate::data_build_event::Event;
use crate::job_run::{DepMissJobRun, SubProcessDepMiss};
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{
JobRunBufferEventV1, MissingDeps, WantAttributedPartitions, WantCreateEventV1,
};
use std::marker::PhantomData;
use uuid::Uuid;
/// Use case: The orchestrator should map a failed job into a set of wants
#[test]
#[ignore]
fn test_job_fail_want_mapping() {
todo!()
// Given a
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// Add event for originating want
let want_create = WantCreateEventV1::sample();
let building_partitions = vec!["data/beta".into()];
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: building_partitions.clone(),
..want_create.clone()
}))
.expect("event append");
// Create failed job run detail
let want_attributed_partitions: Vec<WantAttributedPartitions> =
vec![want_create.clone().into()];
let job_run_id = Uuid::new_v4();
let job_run = JobRunBufferEventV1 {
job_run_id: job_run_id.into(),
building_partitions: building_partitions.clone(),
want_attributed_partitions: want_attributed_partitions.clone(),
..JobRunBufferEventV1::default()
};
orchestrator
.bel
.append_event(&Event::JobRunBufferV1(job_run))
.expect("event append");
// Job runs should not be empty
orchestrator
.bel
.state
.get_job_run(&job_run_id.to_string())
.expect("job run should exist");
// Add event for job failure
let dep_miss_job_run = DepMissJobRun {
job_run_id,
state: SubProcessDepMiss {
stdout_buffer: vec![],
missing_deps: vec![MissingDeps {
impacted: vec!["data/beta".into()],
missing: vec!["data/alpha".into()],
}],
read_deps: vec![],
},
_backend: PhantomData,
};
// When calculating events from dep miss
// TODO this needs to be migrated - orchestrator shouldn't contain mapping logic
let events = orchestrator
.bel
.state
.dep_miss_to_events(&dep_miss_job_run)
.unwrap();
// Should have scheduled a job for alpha
assert_eq!(
events
.iter()
.filter(|e| match e {
Event::WantCreateV1(e) => e.partitions.contains(&"data/alpha".into()),
_ => false,
})
.count(),
1
);
assert!(
orchestrator.not_started_jobs.is_empty(),
"shouldn't have scheduled yet"
);
// Should schedule job after we poll wants
orchestrator.poll_wants().expect("poll wants");
assert_eq!(
orchestrator.not_started_jobs.len(),
1,
"should have scheduled job"
);
}
}
// Orchestrator needs to be able to achieve high level orchestration use cases.
/// Orchestrator needs to be able to achieve high level orchestration use cases.
mod orchestration {
// Use case: should run a job to produce a partition in reaction to a want, then have the
// want fulfilled.
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{PartitionStatusCode, WantCreateEventV1};
use std::thread;
use std::time::Duration;
/// Use case: should run a job to produce a partition in reaction to a want, then have the
/// want fulfilled.
#[test]
#[ignore]
fn test_want_builds_partition() {
todo!()
// Given
let mut orchestrator = setup_scenario_a_to_b(build_orchestrator());
// Add event for originating want
let partition = "data/alpha";
orchestrator
.bel
.append_event(&Event::WantCreateV1(WantCreateEventV1 {
partitions: vec![partition.into()],
..WantCreateEventV1::sample()
}))
.expect("event append");
// When
// Poll wants then schedule pending jobs
orchestrator.poll_wants().expect("stage unscheduled jobs based on wants failed");
assert_eq!(orchestrator.not_started_jobs.len(), 1);
// poll job runs should start job run
orchestrator.poll_job_runs().expect("should start run");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator.poll_job_runs().expect("should still be running");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
// Wait for it to complete
thread::sleep(Duration::from_millis(10));
orchestrator.poll_job_runs().expect("should be able to poll existing job run");
// Job run should have succeeded
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.failed_jobs.is_empty());
assert!(orchestrator.dep_miss_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
assert_eq!(orchestrator.completed_jobs.len(), 1);
// Build state should show partition as live
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionLive.into()),
"partition should be live after job run completion"
);
}
// Use case: a graph with multi-hop deps should achieve the multi-hop build
@ -258,7 +676,7 @@ mod tests {
JobConfiguration {
label: label.to_string(),
pattern: pattern.to_string(),
entrypoint: "test_entrypoint".to_string(),
entry_point: "test_entrypoint".to_string(),
}
}
@ -271,6 +689,7 @@ mod tests {
r#ref: r.to_string(),
})
.collect(),
upstreams: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,

View file

@ -0,0 +1,14 @@
load("@rules_rust//rust:defs.bzl", "rust_binary")
rust_binary(
name = "test_job_helper",
srcs = ["test_job_helper.rs"],
visibility = ["//visibility:public"],
)
sh_test(
name = "test_job_helper_test",
srcs = ["test_job_helper_test.sh"],
args = ["$(location :test_job_helper)"],
data = [":test_job_helper"],
)

View file

@ -0,0 +1,40 @@
// Simple test helper binary for testing DataBuild job runs
// Configurable via environment variables:
// - DATABUILD_TEST_OUTPUT_FILE: Path to write a marker file
// - DATABUILD_TEST_EXIT_CODE: Exit code to return (default: 0)
// - DATABUILD_TEST_STDOUT: Message to write to stdout (default: "test executed")
// - DATABUILD_TEST_SLEEP_MS: Milliseconds to sleep before exiting (default: 0)
use std::env;
use std::fs;
use std::thread;
use std::time::Duration;
fn main() {
// Optional: sleep for testing long-running jobs
if let Ok(sleep_ms) = env::var("DATABUILD_TEST_SLEEP_MS") {
if let Ok(ms) = sleep_ms.parse::<u64>() {
thread::sleep(Duration::from_millis(ms));
}
}
// Write to stdout (for capturing output)
let stdout_msg = env::var("DATABUILD_TEST_STDOUT")
.unwrap_or_else(|_| "test executed".to_string());
println!("{}", stdout_msg);
// Optionally write to a file (for verifying execution)
if let Ok(output_file) = env::var("DATABUILD_TEST_OUTPUT_FILE") {
let content = env::var("DATABUILD_TEST_FILE_CONTENT")
.unwrap_or_else(|_| "test executed".to_string());
fs::write(output_file, content).expect("Failed to write output file");
}
// Exit with configured exit code
let exit_code = env::var("DATABUILD_TEST_EXIT_CODE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
std::process::exit(exit_code);
}

View file

@ -0,0 +1,46 @@
#!/bin/bash
set -e
HELPER="$1"
# Test 1: Default behavior - exit code 0, default stdout
OUTPUT=$("$HELPER")
if [ "$OUTPUT" != "test executed" ]; then
echo "Test 1 failed: Expected 'test executed', got '$OUTPUT'"
exit 1
fi
echo "Test 1 passed: Default stdout"
# Test 2: Custom exit code
DATABUILD_TEST_EXIT_CODE=42 "$HELPER" || EXIT_CODE=$?
if [ "$EXIT_CODE" != "42" ]; then
echo "Test 2 failed: Expected exit code 42, got $EXIT_CODE"
exit 1
fi
echo "Test 2 passed: Custom exit code"
# Test 3: Custom stdout message
OUTPUT=$(DATABUILD_TEST_STDOUT="custom message" "$HELPER")
if [ "$OUTPUT" != "custom message" ]; then
echo "Test 3 failed: Expected 'custom message', got '$OUTPUT'"
exit 1
fi
echo "Test 3 passed: Custom stdout"
# Test 4: Write to file
TEMP_FILE=$(mktemp)
DATABUILD_TEST_OUTPUT_FILE="$TEMP_FILE" DATABUILD_TEST_FILE_CONTENT="hello world" "$HELPER" > /dev/null
if [ ! -f "$TEMP_FILE" ]; then
echo "Test 4 failed: Output file not created"
exit 1
fi
CONTENT=$(cat "$TEMP_FILE")
if [ "$CONTENT" != "hello world" ]; then
echo "Test 4 failed: Expected 'hello world' in file, got '$CONTENT'"
rm "$TEMP_FILE"
exit 1
fi
rm "$TEMP_FILE"
echo "Test 4 passed: Write to file"
echo "All tests passed!"

View file

@ -1,7 +1,75 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::backtrace::Backtrace;
pub fn current_timestamp() -> u64 {
let now = SystemTime::now();
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
duration_since_epoch.as_nanos() as u64
}
fn maybe_backtrace() -> Backtrace {
if std::env::var("RUST_BACKTRACE").is_ok() {
Backtrace::force_capture()
} else {
Backtrace::disabled()
}
}
#[derive(Debug)]
pub struct DatabuildError {
msg: String,
source: Option<Box<dyn std::error::Error + Send + Sync>>,
backtrace: Backtrace,
}
impl DatabuildError {
fn new(msg: impl Into<String>) -> Self {
Self {
msg: msg.into(),
source: None,
backtrace: maybe_backtrace()
}
}
}
impl From<std::io::Error> for DatabuildError {
fn from(err: std::io::Error) -> Self {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
}
}
}
impl From<rusqlite::Error> for DatabuildError {
fn from(err: rusqlite::Error) -> Self {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
}
}
}
impl From<prost::EncodeError> for DatabuildError {
fn from(err: prost::EncodeError) -> Self {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
}
}
}
impl From<String> for DatabuildError {
fn from(value: String) -> Self {
Self::new(value)
}
}
impl std::fmt::Display for DatabuildError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.msg)
}
}

2
docs/ideas/metadata.md Normal file
View file

@ -0,0 +1,2 @@
It would be cool to have user-defined partition/want/job-run metadata, and allow querying of this metadata. Basic example: adding a `run_url` to a job or `adls_location` to a partition. More advanced: adding a `dbx_cores` field to job runs, and using querying over job runs downstream from a want to control parallelism down to the number-of-cores-used level.

16
docs/ideas/querying.md Normal file
View file

@ -0,0 +1,16 @@
Querying seems to be a fundamental factor of the problem. For instance:
- Upon canceling a want, canceling all wants it spawned, and the jobs attached to them.
- Answering the question, "what in-progress job runs were spawned by this want?"
- Answering, "why was this partition built?"
- Answering, "what partitions needed to be built and jobs run to fulfill this want?"
- Answering, "what jobs produce the partitions missed by this job run?"
Let's start prefixing functions that should probably be mostly queries with `query_`.
Notes on JTBDs and queries:
- When a want is schedulable (query), map the requested partitions to the job runs that create them (query), and start them
-

View file

@ -129,40 +129,6 @@ def parse_crate_specs(module_content):
crates[package] = crate_info
return crates
"""Extract crate specifications from MODULE.bazel content."""
crates = {}
# Find all crate.spec() calls
spec_pattern = r'crate\.spec\(\s*(.*?)\s*\)'
specs = re.findall(spec_pattern, module_content, re.DOTALL)
for spec in specs:
# Parse the spec parameters
package_match = re.search(r'package\s*=\s*"([^"]+)"', spec)
version_match = re.search(r'version\s*=\s*"([^"]+)"', spec)
features_match = re.search(r'features\s*=\s*\[(.*?)\]', spec, re.DOTALL)
default_features_match = re.search(r'default_features\s*=\s*False', spec)
if package_match and version_match:
package = package_match.group(1)
version = version_match.group(1)
crate_info = {"version": version}
# Handle features
if features_match:
features_str = features_match.group(1)
features = [f.strip().strip('"') for f in features_str.split(',') if f.strip()]
if features:
crate_info["features"] = features
# Handle default-features = false
if default_features_match:
crate_info["default-features"] = False
crates[package] = crate_info
return crates
def generate_cargo_toml(crates, structure, project_name="databuild"):
"""Generate Cargo.toml content from parsed crates and project structure."""
@ -170,7 +136,7 @@ def generate_cargo_toml(crates, structure, project_name="databuild"):
f'[package]',
f'name = "{project_name}"',
f'version = "0.1.0"',
f'edition = "2021"',
f'edition = "2024"',
f'',
f'# Generated from MODULE.bazel for IDE support only',
f'# Actual dependencies are managed by Bazel',