Compare commits
9 commits
c2bd4f230c
...
b8cfdade16
| Author | SHA1 | Date | |
|---|---|---|---|
| b8cfdade16 | |||
| 5484363e52 | |||
| 2be5b016eb | |||
| 9342ae6816 | |||
| 97ddb3ae28 | |||
| 526b826091 | |||
| 2edfe90fd4 | |||
| 2009ac1c12 | |||
| f7ac3c077e |
9 changed files with 1477 additions and 728 deletions
|
|
@ -148,6 +148,10 @@ crate.spec(
|
|||
package = "chrono",
|
||||
version = "0.4",
|
||||
)
|
||||
crate.spec(
|
||||
package = "regex",
|
||||
version = "1.10",
|
||||
)
|
||||
crate.from_specs()
|
||||
use_repo(crate, "crates")
|
||||
|
||||
|
|
|
|||
1236
MODULE.bazel.lock
1236
MODULE.bazel.lock
File diff suppressed because one or more lines are too long
|
|
@ -21,7 +21,10 @@ rust_library(
|
|||
name = "databuild",
|
||||
srcs = [
|
||||
"build_event_log.rs",
|
||||
"job.rs",
|
||||
"job_run.rs",
|
||||
"lib.rs",
|
||||
"orchestrator.rs",
|
||||
":generate_databuild_rust",
|
||||
],
|
||||
edition = "2021",
|
||||
|
|
@ -37,6 +40,7 @@ rust_library(
|
|||
"@crates//:log",
|
||||
"@crates//:prost",
|
||||
"@crates//:prost-types",
|
||||
"@crates//:regex",
|
||||
"@crates//:rusqlite",
|
||||
"@crates//:schemars",
|
||||
"@crates//:serde",
|
||||
|
|
|
|||
|
|
@ -1,108 +1,217 @@
|
|||
use crate::data_build_event::Event;
|
||||
use crate::{
|
||||
BuildState, DataBuildEvent, EventFilter, WantState,
|
||||
};
|
||||
use crate::{BuildState, DataBuildEvent, WantDetail};
|
||||
use prost::Message;
|
||||
use rusqlite::Connection;
|
||||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
trait BELStorage {
|
||||
pub trait BELStorage {
|
||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
||||
fn list_events(
|
||||
&self,
|
||||
since_idx: u64,
|
||||
filter: EventFilter,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
||||
}
|
||||
|
||||
struct BuildEventLog<B: BELStorage> {
|
||||
storage: B,
|
||||
state: Arc<RwLock<BuildState>>,
|
||||
#[derive(Debug)]
|
||||
pub struct MemoryBELStorage {
|
||||
events: Vec<DataBuildEvent>,
|
||||
}
|
||||
|
||||
impl<B: BELStorage> BuildEventLog<B> {
|
||||
fn create(storage: B) -> BuildEventLog<B> {
|
||||
impl Default for MemoryBELStorage {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryBELStorage {
|
||||
pub fn new() -> MemoryBELStorage {
|
||||
MemoryBELStorage { events: vec![] }
|
||||
}
|
||||
}
|
||||
|
||||
impl BELStorage for MemoryBELStorage {
|
||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||
let now = SystemTime::now();
|
||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||
|
||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||
|
||||
let dbe = DataBuildEvent {
|
||||
timestamp,
|
||||
event_id: self.events.len() as u64,
|
||||
event: Some(event),
|
||||
};
|
||||
self.events.push(dbe);
|
||||
Ok(self.events.len() as u64)
|
||||
}
|
||||
|
||||
fn list_events(
|
||||
&self,
|
||||
since_idx: u64,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||
Ok(self
|
||||
.events
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|e| e.timestamp > since_idx)
|
||||
.take(limit as usize)
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SqliteBELStorage {
|
||||
connection: Connection,
|
||||
}
|
||||
|
||||
impl SqliteBELStorage {
|
||||
fn create(database_url: &str) -> Result<SqliteBELStorage, Box<dyn Error>> {
|
||||
let connection = Connection::open(database_url)?;
|
||||
|
||||
// Create the events table
|
||||
connection.execute(
|
||||
"CREATE TABLE IF NOT EXISTS events (
|
||||
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp INTEGER NOT NULL,
|
||||
event_data BLOB NOT NULL
|
||||
)",
|
||||
(),
|
||||
)?;
|
||||
|
||||
Ok(SqliteBELStorage { connection })
|
||||
}
|
||||
}
|
||||
|
||||
impl BELStorage for SqliteBELStorage {
|
||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||
let now = SystemTime::now();
|
||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||
|
||||
// Serialize the event using prost
|
||||
let dbe = DataBuildEvent {
|
||||
timestamp,
|
||||
event_id: 0, // Will be set by the database
|
||||
event: Some(event),
|
||||
};
|
||||
|
||||
let mut buf = Vec::new();
|
||||
prost::Message::encode(&dbe, &mut buf)?;
|
||||
|
||||
self.connection.execute(
|
||||
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
|
||||
(×tamp, &buf),
|
||||
)?;
|
||||
|
||||
let event_id = self.connection.last_insert_rowid() as u64;
|
||||
Ok(event_id)
|
||||
}
|
||||
|
||||
fn list_events(
|
||||
&self,
|
||||
since_idx: u64,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||
let mut stmt = self.connection.prepare(
|
||||
"SELECT event_id, timestamp, event_data FROM events
|
||||
WHERE timestamp > ?1
|
||||
ORDER BY event_id
|
||||
LIMIT ?2",
|
||||
)?;
|
||||
|
||||
let rows = stmt.query_map([since_idx, limit], |row| {
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let timestamp: u64 = row.get(1)?;
|
||||
let event_data: Vec<u8> = row.get(2)?;
|
||||
|
||||
// Deserialize the event using prost
|
||||
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|e| {
|
||||
rusqlite::Error::InvalidColumnType(
|
||||
0,
|
||||
"event_data".to_string(),
|
||||
rusqlite::types::Type::Blob,
|
||||
)
|
||||
})?;
|
||||
|
||||
// Update the event_id from the database
|
||||
dbe.event_id = event_id;
|
||||
dbe.timestamp = timestamp;
|
||||
|
||||
let result: DataBuildEvent = dbe;
|
||||
|
||||
Ok(result)
|
||||
})?;
|
||||
|
||||
let mut events = Vec::new();
|
||||
for row_result in rows {
|
||||
events.push(row_result?);
|
||||
}
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BuildEventLog<B: BELStorage + Debug> {
|
||||
pub storage: B,
|
||||
pub state: Arc<RwLock<BuildState>>,
|
||||
}
|
||||
|
||||
impl<B: BELStorage + Debug> BuildEventLog<B> {
|
||||
pub fn new(storage: B) -> BuildEventLog<B> {
|
||||
BuildEventLog {
|
||||
storage,
|
||||
state: Arc::new(Default::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BELStorage> BuildEventLog<B> {
|
||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||
let idx = self.storage.append_event(event.clone())?;
|
||||
self.reduce(event);
|
||||
pub fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||
self.reduce(event.clone())?;
|
||||
let idx = self.storage.append_event(event)?;
|
||||
Ok(idx)
|
||||
}
|
||||
|
||||
fn reduce(&mut self, event: Event) {
|
||||
fn reduce(&mut self, event: Event) -> Result<(), Box<dyn Error>> {
|
||||
let mut state = self.state.write().expect("lock poisoned");
|
||||
match event {
|
||||
Event::JobRunBuffer(e) => {}
|
||||
Event::JobRunQueue(_) => {}
|
||||
Event::JobRunStarted(_) => {}
|
||||
Event::JobRunHeartbeat(_) => {}
|
||||
Event::JobRunSuccess(_) => {}
|
||||
Event::JobRunFailure(_) => {}
|
||||
Event::JobRunCancel(_) => {}
|
||||
Event::JobRunMissingDeps(_) => {}
|
||||
Event::WantCreate(e) => {
|
||||
self.state
|
||||
.write()
|
||||
.expect("couldn't take write lock")
|
||||
.wants
|
||||
.insert(e.want_id.clone(), WantState { want_id: e.want_id });
|
||||
Event::JobRunBufferV1(e) => {}
|
||||
Event::JobRunQueueV1(e) => {}
|
||||
Event::JobRunHeartbeatV1(e) => {}
|
||||
Event::JobRunSuccessV1(e) => {}
|
||||
Event::JobRunFailureV1(e) => {}
|
||||
Event::JobRunCancelV1(e) => {}
|
||||
Event::JobRunMissingDepsV1(e) => {}
|
||||
Event::WantCreateV1(e) => {
|
||||
state.wants.insert(
|
||||
e.want_id.clone(),
|
||||
WantDetail {
|
||||
want_id: e.want_id,
|
||||
refs: e.partitions,
|
||||
},
|
||||
);
|
||||
}
|
||||
Event::WantCancel(_) => {}
|
||||
Event::TaintCreate(_) => {}
|
||||
Event::TaintDelete(_) => {}
|
||||
Event::WantCancelV1(e) => {}
|
||||
Event::TaintCreateV1(e) => {}
|
||||
Event::TaintDeleteV1(e) => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use crate::build_event_log::{BELStorage, BuildEventLog};
|
||||
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage, SqliteBELStorage};
|
||||
use crate::data_build_event::Event;
|
||||
use crate::{DataBuildEvent, EventFilter, PartitionRef, WantCreateEvent};
|
||||
use std::error::Error;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
struct TestBELStorage {
|
||||
events: Vec<DataBuildEvent>,
|
||||
}
|
||||
|
||||
impl TestBELStorage {
|
||||
fn create() -> TestBELStorage {
|
||||
TestBELStorage { events: vec![] }
|
||||
}
|
||||
}
|
||||
|
||||
impl BELStorage for TestBELStorage {
|
||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||
let now = SystemTime::now();
|
||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards");
|
||||
|
||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||
|
||||
let dbe = DataBuildEvent {
|
||||
timestamp,
|
||||
event_id: self.events.len() as u64,
|
||||
event: Some(event),
|
||||
};
|
||||
self.events.push(dbe);
|
||||
Ok(self.events.len() as u64)
|
||||
}
|
||||
|
||||
fn list_events(
|
||||
&self,
|
||||
since_idx: u64,
|
||||
filter: EventFilter,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||
Ok(self.events.clone())
|
||||
}
|
||||
}
|
||||
use crate::{PartitionRef, WantCreateEventV1};
|
||||
|
||||
#[test]
|
||||
fn test_hello() {
|
||||
|
|
@ -111,8 +220,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_append_event() {
|
||||
let storage = TestBELStorage::create();
|
||||
let mut log = BuildEventLog::create(storage);
|
||||
let storage = MemoryBELStorage::new();
|
||||
let mut log = BuildEventLog::new(storage);
|
||||
// Initial state
|
||||
assert_eq!(log.storage.events.len(), 0);
|
||||
let want_id = "1234".to_string();
|
||||
|
|
@ -122,12 +231,12 @@ mod tests {
|
|||
}
|
||||
|
||||
// Given
|
||||
log.append_event(Event::WantCreate(WantCreateEvent {
|
||||
log.append_event(Event::WantCreateV1(WantCreateEventV1 {
|
||||
want_id: want_id.clone(),
|
||||
root_want_id: "123".to_string(),
|
||||
parent_want_id: "123".to_string(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "".to_string(),
|
||||
r#ref: "1234".to_string(),
|
||||
}],
|
||||
data_timestamp: 0,
|
||||
ttl_seconds: 1,
|
||||
|
|
@ -151,4 +260,84 @@ mod tests {
|
|||
"want_id not equal",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sqlite_append_event() {
|
||||
let storage =
|
||||
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||
let mut log = BuildEventLog::new(storage);
|
||||
|
||||
let want_id = "sqlite_test_1234".to_string();
|
||||
|
||||
// Initial state - verify storage is empty
|
||||
let events = log
|
||||
.storage
|
||||
.list_events(0, 100)
|
||||
.expect("Failed to list events");
|
||||
assert_eq!(events.len(), 0);
|
||||
|
||||
// Verify want doesn't exist in state
|
||||
{
|
||||
let state = log.state.read().unwrap();
|
||||
assert!(state.wants.get(&want_id).is_none());
|
||||
}
|
||||
|
||||
// Append an event
|
||||
let event_id = log
|
||||
.append_event(Event::WantCreateV1(WantCreateEventV1 {
|
||||
want_id: want_id.clone(),
|
||||
root_want_id: "sqlite_root_123".to_string(),
|
||||
parent_want_id: "sqlite_parent_123".to_string(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "sqlite_partition_1234".to_string(),
|
||||
}],
|
||||
data_timestamp: 0,
|
||||
ttl_seconds: 1,
|
||||
sla_seconds: 1,
|
||||
source: None,
|
||||
comment: None,
|
||||
}))
|
||||
.expect("append_event failed");
|
||||
|
||||
// Verify event was stored
|
||||
assert!(event_id > 0);
|
||||
|
||||
// Verify event can be retrieved
|
||||
let events = log
|
||||
.storage
|
||||
.list_events(0, 100)
|
||||
.expect("Failed to list events");
|
||||
assert_eq!(events.len(), 1);
|
||||
|
||||
let stored_event = &events[0];
|
||||
assert_eq!(stored_event.event_id, event_id);
|
||||
assert!(stored_event.timestamp > 0);
|
||||
|
||||
// Verify the event content
|
||||
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
|
||||
assert_eq!(want_event.want_id, want_id);
|
||||
assert_eq!(want_event.root_want_id, "sqlite_root_123");
|
||||
assert_eq!(want_event.parent_want_id, "sqlite_parent_123");
|
||||
assert_eq!(want_event.partitions.len(), 1);
|
||||
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
|
||||
} else {
|
||||
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
|
||||
}
|
||||
|
||||
// Verify state was updated
|
||||
let state = log.state.read().expect("couldn't take read lock");
|
||||
assert!(
|
||||
state.wants.get(&want_id).is_some(),
|
||||
"want_id not found in state"
|
||||
);
|
||||
assert_eq!(
|
||||
state
|
||||
.wants
|
||||
.get(&want_id)
|
||||
.map(|want| want.want_id.clone())
|
||||
.expect("state.wants want_id not found"),
|
||||
want_id,
|
||||
"want_id not equal in state",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,33 +14,83 @@ message DataBuildEvent {
|
|||
uint64 event_id = 2;
|
||||
oneof event {
|
||||
// Job run events
|
||||
JobRunBufferEvent job_run_buffer = 3;
|
||||
JobRunQueueEvent job_run_queue = 4;
|
||||
JobRunStartEvent job_run_started = 5;
|
||||
JobRunHeartbeatEvent job_run_heartbeat = 6;
|
||||
JobRunSuccessEvent job_run_success = 7;
|
||||
JobRunFailureEvent job_run_failure = 8;
|
||||
JobRunCancelEvent job_run_cancel = 9;
|
||||
JobRunMissingDepsEvent job_run_missing_deps = 10;
|
||||
JobRunBufferEventV1 job_run_buffer_v1 = 10;
|
||||
JobRunQueueEventV1 job_run_queue_v1 = 11;
|
||||
JobRunHeartbeatEventV1 job_run_heartbeat_v1 = 12;
|
||||
JobRunSuccessEventV1 job_run_success_v1 = 13;
|
||||
JobRunFailureEventV1 job_run_failure_v1 = 14;
|
||||
JobRunCancelEventV1 job_run_cancel_v1 = 15;
|
||||
JobRunMissingDepsEventV1 job_run_missing_deps_v1 = 16;
|
||||
// Want events
|
||||
WantCreateEvent want_create = 11;
|
||||
WantCancelEvent want_cancel = 12;
|
||||
WantCreateEventV1 want_create_v1 = 17;
|
||||
WantCancelEventV1 want_cancel_v1 = 18;
|
||||
// Taint events
|
||||
TaintCreateEvent taint_create = 13;
|
||||
TaintDeleteEvent taint_delete = 14;
|
||||
TaintCreateEventV1 taint_create_v1 = 19;
|
||||
TaintDeleteEventV1 taint_delete_v1 = 20;
|
||||
}
|
||||
}
|
||||
|
||||
message JobRunBufferEvent {}
|
||||
message JobRunQueueEvent {}
|
||||
message JobRunStartEvent {}
|
||||
message JobRunHeartbeatEvent {}
|
||||
message JobRunSuccessEvent {}
|
||||
message JobRunFailureEvent {}
|
||||
message JobRunCancelEvent {}
|
||||
message JobRunMissingDepsEvent {}
|
||||
// Source metadata for user-driven events
|
||||
message EventSource {
|
||||
// Revisit - how do we model this? See this chat: https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636
|
||||
EventSourceType source_type = 1;
|
||||
string source_name = 2;
|
||||
}
|
||||
message EventSourceType {
|
||||
EventSourceCode code = 1;
|
||||
string name = 2;
|
||||
}
|
||||
enum EventSourceCode{
|
||||
Manual = 0;
|
||||
Automated = 1;
|
||||
Propagated = 2;
|
||||
}
|
||||
|
||||
message WantCreateEvent {
|
||||
// Indicates buffer state for job.
|
||||
message JobRunBufferEventV1 {
|
||||
string job_run_id = 1;
|
||||
string job_label = 2;
|
||||
repeated string servicing_want_ids = 3;
|
||||
repeated string producing_partition_refs = 4;
|
||||
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
|
||||
}
|
||||
// Just indicates that job has entered queue
|
||||
message JobRunQueueEventV1 {
|
||||
string job_run_id = 1;
|
||||
}
|
||||
// Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In
|
||||
// future it will also be used to enable job re-entrance.
|
||||
message JobRunHeartbeatEventV1 {
|
||||
string job_run_id = 1;
|
||||
// TODO reentrance?
|
||||
}
|
||||
// Simply indicates that the job has succeeded.
|
||||
message JobRunSuccessEventV1 {
|
||||
string job_run_id = 1;
|
||||
}
|
||||
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
|
||||
message JobRunFailureEventV1 {
|
||||
string job_run_id = 1;
|
||||
}
|
||||
// Job was explicitly canceled.
|
||||
message JobRunCancelEventV1 {
|
||||
string job_run_id = 1;
|
||||
EventSource source = 2;
|
||||
optional string comment = 3;
|
||||
}
|
||||
// Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated.
|
||||
message JobRunMissingDepsEventV1 {
|
||||
string job_run_id = 1;
|
||||
repeated MissingDeps missing_deps = 2;
|
||||
}
|
||||
message MissingDeps {
|
||||
// The list of partition refs that are prevented from building by these missing deps (can be just 1)
|
||||
repeated PartitionRef impacted = 1;
|
||||
repeated PartitionRef missing = 2;
|
||||
}
|
||||
|
||||
|
||||
message WantCreateEventV1 {
|
||||
string want_id = 1;
|
||||
string root_want_id = 2;
|
||||
string parent_want_id = 3;
|
||||
|
|
@ -48,58 +98,52 @@ message WantCreateEvent {
|
|||
uint64 data_timestamp = 5;
|
||||
uint64 ttl_seconds = 6;
|
||||
uint64 sla_seconds = 7;
|
||||
WantSource source = 8;
|
||||
EventSource source = 8;
|
||||
optional string comment = 9;
|
||||
}
|
||||
message WantSource {
|
||||
WantSourceType source_type = 1;
|
||||
string source_name = 2;
|
||||
}
|
||||
message WantSourceType {
|
||||
WantSourceCode code = 1;
|
||||
string name = 2;
|
||||
}
|
||||
enum WantSourceCode{
|
||||
Manual = 0;
|
||||
Automated = 1;
|
||||
Propagated = 2;
|
||||
}
|
||||
message WantCancelEvent {
|
||||
message WantCancelEventV1 {
|
||||
string want_id = 1;
|
||||
optional string reason = 2;
|
||||
EventSource source = 2;
|
||||
optional string comment = 3;
|
||||
}
|
||||
|
||||
message TaintCreateEvent {
|
||||
message TaintCreateEventV1 {
|
||||
string taint_id = 1;
|
||||
repeated PartitionRef partitions = 2;
|
||||
optional string reason = 3;
|
||||
string root_taint_id = 2;
|
||||
string parent_taint_id = 3;
|
||||
repeated PartitionRef partitions = 4;
|
||||
EventSource source = 5;
|
||||
optional string comment = 6;
|
||||
}
|
||||
message TaintDeleteEvent {
|
||||
message TaintDeleteEventV1 {
|
||||
string taint_id = 1;
|
||||
optional string reason = 2;
|
||||
EventSource source = 2;
|
||||
optional string comment = 3;
|
||||
}
|
||||
|
||||
// Build State
|
||||
|
||||
// Represents the whole state of the system
|
||||
message BuildState {
|
||||
map<string, WantState> wants = 1;
|
||||
map<string, PartitionState> partitions = 2;
|
||||
map<string, TaintState> taints = 3;
|
||||
map<string, WantDetail> wants = 1;
|
||||
map<string, PartitionDetail> partitions = 2;
|
||||
map<string, TaintDetail> taints = 3;
|
||||
map<string, JobRunDetail> job_runs = 4;
|
||||
}
|
||||
|
||||
message WantState {
|
||||
message WantDetail {
|
||||
string want_id = 1;
|
||||
repeated PartitionRef refs = 2;
|
||||
// TODO
|
||||
}
|
||||
|
||||
message PartitionState {
|
||||
message PartitionDetail {
|
||||
// The partition reference
|
||||
PartitionRef ref = 1;
|
||||
// The partitions current status
|
||||
PartitionStatus status = 2;
|
||||
// The latest update to the partition's status
|
||||
optional uint64 last_updated_at = 3;
|
||||
optional uint64 last_updated_at = 3;
|
||||
// IDs that associate the partition with other objects
|
||||
repeated string job_run_ids = 4;
|
||||
repeated string want_ids = 5;
|
||||
|
|
@ -109,22 +153,26 @@ message PartitionStatus {
|
|||
PartitionStatusCode code = 1;
|
||||
string name = 2;
|
||||
}
|
||||
enum PartitionStatusCode{
|
||||
enum PartitionStatusCode {
|
||||
// TODO how do we avoid copying job states here?
|
||||
Unknown = 0;
|
||||
Wanted = 1;
|
||||
Building = 2;
|
||||
Live = 3;
|
||||
Tainted = 4;
|
||||
Failed = 4;
|
||||
Tainted = 5;
|
||||
}
|
||||
|
||||
message TaintState {
|
||||
|
||||
message TaintDetail {
|
||||
// TODO
|
||||
}
|
||||
|
||||
message JobRunDetail {
|
||||
// TODO
|
||||
}
|
||||
|
||||
|
||||
message EventFilter {
|
||||
repeated string partition_refs = 1; // Exact partition matches
|
||||
repeated string partition_patterns = 2; // Glob patterns like "data/users/*"
|
||||
repeated string job_labels = 3; // Job-specific events
|
||||
repeated string job_run_ids = 4; // Job run events
|
||||
// IDs of wants to get relevant events for
|
||||
repeated string want_ids = 1;
|
||||
}
|
||||
|
|
|
|||
27
databuild/job.rs
Normal file
27
databuild/job.rs
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
use crate::job_run::JobRun;
|
||||
use crate::{PartitionRef, WantDetail};
|
||||
use regex::Regex;
|
||||
use std::error::Error;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JobConfiguration {
|
||||
pub label: String,
|
||||
pub pattern: String,
|
||||
pub entrypoint: String,
|
||||
}
|
||||
|
||||
impl JobConfiguration {
|
||||
/** Launch job to build the partitions specified by the provided wants. */
|
||||
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
|
||||
let wanted_refs: Vec<PartitionRef> =
|
||||
wants.iter().flat_map(|want| want.refs.clone()).collect();
|
||||
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||
JobRun::spawn(self.entrypoint.clone(), args)
|
||||
}
|
||||
|
||||
pub fn matches(&self, refs: &PartitionRef) -> bool {
|
||||
let regex =
|
||||
Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern));
|
||||
regex.is_match(&refs.r#ref)
|
||||
}
|
||||
}
|
||||
165
databuild/job_run.rs
Normal file
165
databuild/job_run.rs
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
use crate::build_event_log::{BELStorage, MemoryBELStorage};
|
||||
use crate::data_build_event::Event;
|
||||
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
|
||||
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
|
||||
use std::error::Error;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::process::{Child, Command, ExitStatus, Stdio};
|
||||
use uuid::Uuid;
|
||||
|
||||
/** Wrapper type that can be mocked */
|
||||
trait JobRunChild {
|
||||
fn exit_status(&mut self) -> Option<ExitStatus>;
|
||||
fn stdout_lines(&mut self) -> Vec<String>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WrappedProcessChild(Child);
|
||||
|
||||
impl JobRunChild for WrappedProcessChild {
|
||||
fn exit_status(&mut self) -> Option<ExitStatus> {
|
||||
self.0.try_wait().expect("coudn't wait")
|
||||
}
|
||||
|
||||
fn stdout_lines(&mut self) -> Vec<String> {
|
||||
let mut stdout_lines = Vec::new();
|
||||
let stdout = self.0.stdout.take().expect("stdout not piped");
|
||||
let reader = BufReader::new(stdout);
|
||||
for line in reader.lines() {
|
||||
stdout_lines.push(line.expect("stdout not piped"));
|
||||
}
|
||||
stdout_lines
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Child> for WrappedProcessChild {
|
||||
fn from(child: Child) -> Self {
|
||||
Self { 0: child }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JobRun {
|
||||
job_run_id: Uuid,
|
||||
events: MemoryBELStorage,
|
||||
child: Box<dyn JobRunChild>,
|
||||
unhandled_lines: Vec<String>,
|
||||
}
|
||||
|
||||
const EVENT_SIZE_LIMIT: u64 = 1000000;
|
||||
|
||||
impl JobRun {
|
||||
pub fn spawn(command: String, args: Vec<String>) -> Result<JobRun, Box<dyn Error>> {
|
||||
Ok(JobRun {
|
||||
job_run_id: Default::default(),
|
||||
events: Default::default(),
|
||||
child: Box::new(WrappedProcessChild::from(
|
||||
Command::new(command)
|
||||
.args(args)
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()?,
|
||||
)),
|
||||
unhandled_lines: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn visit(&mut self, since_idx: u64) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||
// Collect new lines from child process
|
||||
|
||||
// Parse BEL events from child process
|
||||
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
|
||||
for event in new_events {
|
||||
self.events.append_event(event)?;
|
||||
}
|
||||
self.unhandled_lines.drain(..);
|
||||
|
||||
// Potentially react to job completion
|
||||
match self.exit_status() {
|
||||
None => {} // No exit -> no harm
|
||||
Some(status) => {
|
||||
if status.success() {
|
||||
self.events
|
||||
.append_event(JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||
job_run_id: self.job_run_id.into(),
|
||||
}))?;
|
||||
} else {
|
||||
self.events
|
||||
.append_event(JobRunFailureV1(JobRunFailureEventV1 {
|
||||
job_run_id: self.job_run_id.into(),
|
||||
}))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return BEL events since provided idx
|
||||
self.events
|
||||
.list_events(since_idx, EVENT_SIZE_LIMIT)
|
||||
.and_then(|events| {
|
||||
if events.len() as u64 == EVENT_SIZE_LIMIT {
|
||||
Err(format!(
|
||||
"Returned {} events - that's way too many.",
|
||||
EVENT_SIZE_LIMIT
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
Ok(events)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn event_for_line(line: String) -> Option<Event> {
|
||||
// TODO parse missing data dep event
|
||||
// TODO parse job state
|
||||
None
|
||||
}
|
||||
|
||||
pub fn process_lines(job_run_id: Uuid, lines: &Vec<String>) -> Vec<Event> {
|
||||
let mut events: Vec<Event> = Default::default();
|
||||
|
||||
if lines.len() > 0 {
|
||||
// If any lines were written to stdout, we should heartbeat
|
||||
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||
job_run_id: job_run_id.clone().into(),
|
||||
}));
|
||||
}
|
||||
|
||||
for event in lines
|
||||
.iter()
|
||||
.flat_map(|line| Self::event_for_line(line.clone()))
|
||||
{
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
pub fn exit_status(&mut self) -> Option<ExitStatus> {
|
||||
self.child.exit_status()
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use crate::job_run::JobRun;
|
||||
|
||||
#[test]
|
||||
fn test_process_lines_empty() {
|
||||
let lines = Vec::<String>::new();
|
||||
let events = JobRun::process_lines(Default::default(), &lines);
|
||||
assert_eq!(events.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_lines_heartbeat() {
|
||||
let lines_1 = vec!["Hello, salem".to_string()];
|
||||
let events_1 = JobRun::process_lines(Default::default(), &lines_1);
|
||||
assert_eq!(events_1.len(), 1);
|
||||
|
||||
let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()];
|
||||
let events_2 = JobRun::process_lines(Default::default(), &lines_2);
|
||||
assert_eq!(events_2.len(), 1);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,7 @@
|
|||
mod build_event_log;
|
||||
mod orchestrator;
|
||||
mod job_run;
|
||||
mod job;
|
||||
|
||||
// Include generated protobuf code
|
||||
include!("databuild.rs");
|
||||
|
|
|
|||
261
databuild/orchestrator.rs
Normal file
261
databuild/orchestrator.rs
Normal file
|
|
@ -0,0 +1,261 @@
|
|||
use crate::build_event_log::{BELStorage, BuildEventLog};
|
||||
use crate::job::JobConfiguration;
|
||||
use crate::job_run::JobRun;
|
||||
use crate::{PartitionRef, WantDetail};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
|
||||
/**
|
||||
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||
the visitor pattern to monitor job exec progress and liveness, and adds
|
||||
*/
|
||||
|
||||
struct Orchestrator<B: BELStorage + Debug> {
|
||||
bel: BuildEventLog<B>,
|
||||
job_runs: Vec<JobRunHandle>,
|
||||
config: OrchestratorConfig,
|
||||
}
|
||||
|
||||
struct JobRunHandle {
|
||||
job_run: JobRun,
|
||||
bel_idx: u64,
|
||||
}
|
||||
|
||||
impl From<JobRun> for JobRunHandle {
|
||||
fn from(job_run: JobRun) -> Self {
|
||||
Self {
|
||||
job_run,
|
||||
bel_idx: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct OrchestratorConfig {
|
||||
jobs: Vec<JobConfiguration>,
|
||||
}
|
||||
|
||||
impl OrchestratorConfig {
|
||||
fn job_configuration_for_label(&self, label: &str) -> Option<JobConfiguration> {
|
||||
self.jobs.iter().find(|job| job.label == label).cloned()
|
||||
}
|
||||
|
||||
fn match_job_partition(&self, pref: &PartitionRef) -> Option<JobConfiguration> {
|
||||
self.jobs.iter().find(|job| job.matches(pref)).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct WantGroup {
|
||||
job: JobConfiguration,
|
||||
wants: Vec<WantDetail>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct GroupedWants {
|
||||
want_groups: Vec<WantGroup>,
|
||||
unhandled_wants: Vec<WantDetail>,
|
||||
}
|
||||
|
||||
impl<B: BELStorage + Debug> Orchestrator<B> {
|
||||
fn new(storage: B, config: OrchestratorConfig) -> Self {
|
||||
Self {
|
||||
bel: BuildEventLog::new(storage),
|
||||
job_runs: Vec::new(),
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
/** Continuously invoked function to watch job run status */
|
||||
fn poll_jobs(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
// Visit existing jobs, remove completed
|
||||
self.job_runs.retain_mut(|jr| {
|
||||
// Append emitted events
|
||||
let events = jr
|
||||
.job_run
|
||||
.visit(jr.bel_idx.clone())
|
||||
.expect("Job visit failed");
|
||||
events
|
||||
.iter()
|
||||
.filter_map(|event| event.event.clone())
|
||||
.for_each(|event| {
|
||||
self.bel
|
||||
.append_event(event.clone())
|
||||
.expect("Failed to append event");
|
||||
});
|
||||
|
||||
// Retain job run if it doesn't yet have an exit code (still running)
|
||||
jr.job_run.exit_status().is_none()
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/** Continuously invoked function to watch wants and schedule new jobs */
|
||||
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
|
||||
// Collect unhandled wants, group by job that handles each partition,
|
||||
let grouped_wants =
|
||||
Orchestrator::<B>::group_wants(&self.config, &self.bel.schedulable_wants());
|
||||
|
||||
if !grouped_wants.want_groups.is_empty() {
|
||||
// All wants must be mapped to jobs that can be handled
|
||||
// TODO we probably want to handle this gracefully in the near future
|
||||
Err(format!(
|
||||
"Unable to map following wants: {:?}",
|
||||
&grouped_wants.want_groups
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
for wg in grouped_wants.want_groups {
|
||||
let job_run = wg.job.spawn(wg.wants)?;
|
||||
self.job_runs.push(JobRunHandle::from(job_run));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn group_wants(config: &OrchestratorConfig, wants: &Vec<WantDetail>) -> GroupedWants {
|
||||
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
|
||||
let mut unhandled_wants: Vec<WantDetail> = Default::default();
|
||||
wants.iter().for_each(|want| {
|
||||
want.refs.iter().for_each(|pref| {
|
||||
let matched_job = config.match_job_partition(pref);
|
||||
match matched_job {
|
||||
None => unhandled_wants.push(want.clone()),
|
||||
Some(jc) => want_groups.entry(jc.label).or_default().push(want.clone()),
|
||||
}
|
||||
});
|
||||
});
|
||||
GroupedWants {
|
||||
want_groups: want_groups
|
||||
.iter()
|
||||
.map(|(k, v)| WantGroup {
|
||||
job: config
|
||||
.job_configuration_for_label(k)
|
||||
.expect(&format!("Job configuration not found for label `{}`", k)),
|
||||
wants: v.to_owned(),
|
||||
})
|
||||
.collect(),
|
||||
unhandled_wants,
|
||||
}
|
||||
}
|
||||
|
||||
/** Entrypoint for running jobs */
|
||||
pub fn join(mut self) -> Result<(), Box<dyn Error>> {
|
||||
loop {
|
||||
self.poll_jobs()?;
|
||||
self.poll_wants()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
mod want_group {
|
||||
use super::super::*;
|
||||
use crate::build_event_log::MemoryBELStorage;
|
||||
use crate::{PartitionRef, WantDetail};
|
||||
|
||||
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
|
||||
JobConfiguration {
|
||||
label: label.to_string(),
|
||||
pattern: pattern.to_string(),
|
||||
entrypoint: "test_entrypoint".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
|
||||
WantDetail {
|
||||
want_id: want_id.to_string(),
|
||||
refs: partition_refs
|
||||
.iter()
|
||||
.map(|r| PartitionRef {
|
||||
r#ref: r.to_string(),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_wants_empty_config_empty_wants() {
|
||||
let config = OrchestratorConfig { jobs: vec![] };
|
||||
let wants = vec![];
|
||||
|
||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
||||
|
||||
assert!(result.want_groups.is_empty());
|
||||
assert!(result.unhandled_wants.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_wants_one_want_matches_job() {
|
||||
let job_config = create_job_config("test_job", "partition.*");
|
||||
let config = OrchestratorConfig {
|
||||
jobs: vec![job_config.clone()],
|
||||
};
|
||||
let want = create_want_detail("want1", vec!["partition1"]);
|
||||
let wants = vec![want.clone()];
|
||||
|
||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
||||
|
||||
assert!(result.unhandled_wants.is_empty());
|
||||
assert_eq!(result.want_groups.len(), 1);
|
||||
assert_eq!(result.want_groups[0].job.label, "test_job");
|
||||
assert_eq!(result.want_groups[0].wants.len(), 1);
|
||||
assert_eq!(result.want_groups[0].wants[0].want_id, "want1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_wants_one_unmatching_want() {
|
||||
let job_config = create_job_config("test_job", "^test_pattern$");
|
||||
let config = OrchestratorConfig {
|
||||
jobs: vec![job_config],
|
||||
};
|
||||
let want = create_want_detail("want1", vec!["different_partition"]);
|
||||
let wants = vec![want.clone()];
|
||||
|
||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
||||
|
||||
assert_eq!(result.unhandled_wants.len(), 1);
|
||||
assert_eq!(result.unhandled_wants[0].want_id, "want1");
|
||||
assert!(result.want_groups.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_wants_multiple_wants_different_jobs() {
|
||||
let job_config1 = create_job_config("job1", "pattern1.*");
|
||||
let job_config2 = create_job_config("job2", "pattern2.*");
|
||||
let config = OrchestratorConfig {
|
||||
jobs: vec![job_config1, job_config2],
|
||||
};
|
||||
|
||||
let want1 = create_want_detail("want1", vec!["pattern1_partition"]);
|
||||
let want2 = create_want_detail("want2", vec!["pattern1_other"]);
|
||||
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
|
||||
let wants = vec![want1, want2, want3];
|
||||
|
||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
||||
|
||||
assert!(result.unhandled_wants.is_empty());
|
||||
assert_eq!(result.want_groups.len(), 2);
|
||||
|
||||
// Find job1 group
|
||||
let job1_group = result
|
||||
.want_groups
|
||||
.iter()
|
||||
.find(|wg| wg.job.label == "job1")
|
||||
.unwrap();
|
||||
assert_eq!(job1_group.wants.len(), 2);
|
||||
|
||||
// Find job2 group
|
||||
let job2_group = result
|
||||
.want_groups
|
||||
.iter()
|
||||
.find(|wg| wg.job.label == "job2")
|
||||
.unwrap();
|
||||
assert_eq!(job2_group.wants.len(), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue