BEL events proto updates

This commit is contained in:
Stuart Axelbrooke 2025-09-13 16:50:57 -07:00
parent c2bd4f230c
commit f7ac3c077e
2 changed files with 114 additions and 70 deletions

View file

@ -1,7 +1,5 @@
use crate::data_build_event::Event;
use crate::{
BuildState, DataBuildEvent, EventFilter, WantState,
};
use crate::{BuildState, DataBuildEvent, EventFilter, WantDetail};
use std::error::Error;
use std::sync::{Arc, RwLock};
@ -27,36 +25,34 @@ impl<B: BELStorage> BuildEventLog<B> {
state: Arc::new(Default::default()),
}
}
}
impl<B: BELStorage> BuildEventLog<B> {
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
let idx = self.storage.append_event(event.clone())?;
self.reduce(event);
self.reduce(event.clone())?;
let idx = self.storage.append_event(event)?;
Ok(idx)
}
fn reduce(&mut self, event: Event) {
fn reduce(&mut self, event: Event) -> Result<(), Box<dyn Error>> {
let mut state = self.state.write().expect("lock poisoned");
match event {
Event::JobRunBuffer(e) => {}
Event::JobRunQueue(_) => {}
Event::JobRunStarted(_) => {}
Event::JobRunHeartbeat(_) => {}
Event::JobRunSuccess(_) => {}
Event::JobRunFailure(_) => {}
Event::JobRunCancel(_) => {}
Event::JobRunMissingDeps(_) => {}
Event::JobRunQueue(e) => {}
Event::JobRunHeartbeat(e) => {}
Event::JobRunSuccess(e) => {}
Event::JobRunFailure(e) => {}
Event::JobRunCancel(e) => {}
Event::JobRunMissingDeps(e) => {}
Event::WantCreate(e) => {
self.state
.write()
.expect("couldn't take write lock")
state
.wants
.insert(e.want_id.clone(), WantState { want_id: e.want_id });
.insert(e.want_id.clone(), WantDetail { want_id: e.want_id });
}
Event::WantCancel(_) => {}
Event::TaintCreate(_) => {}
Event::TaintDelete(_) => {}
Event::WantCancel(e) => {}
Event::TaintCreate(e) => {}
Event::TaintDelete(e) => {}
}
Ok(())
}
}
@ -80,8 +76,7 @@ mod tests {
impl BELStorage for TestBELStorage {
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
let now = SystemTime::now();
let duration_since_epoch = now.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
let timestamp = duration_since_epoch.as_nanos() as u64;

View file

@ -16,29 +16,79 @@ message DataBuildEvent {
// Job run events
JobRunBufferEvent job_run_buffer = 3;
JobRunQueueEvent job_run_queue = 4;
JobRunStartEvent job_run_started = 5;
JobRunHeartbeatEvent job_run_heartbeat = 6;
JobRunSuccessEvent job_run_success = 7;
JobRunFailureEvent job_run_failure = 8;
JobRunCancelEvent job_run_cancel = 9;
JobRunMissingDepsEvent job_run_missing_deps = 10;
JobRunHeartbeatEvent job_run_heartbeat = 5;
JobRunSuccessEvent job_run_success = 6;
JobRunFailureEvent job_run_failure = 7;
JobRunCancelEvent job_run_cancel = 8;
JobRunMissingDepsEvent job_run_missing_deps = 9;
// Want events
WantCreateEvent want_create = 11;
WantCancelEvent want_cancel = 12;
WantCreateEvent want_create = 10;
WantCancelEvent want_cancel = 11;
// Taint events
TaintCreateEvent taint_create = 13;
TaintDeleteEvent taint_delete = 14;
TaintCreateEvent taint_create = 12;
TaintDeleteEvent taint_delete = 13;
}
}
message JobRunBufferEvent {}
message JobRunQueueEvent {}
message JobRunStartEvent {}
message JobRunHeartbeatEvent {}
message JobRunSuccessEvent {}
message JobRunFailureEvent {}
message JobRunCancelEvent {}
message JobRunMissingDepsEvent {}
// Source metadata for user-driven events
message EventSource {
// Revisit - how do we model this? See this chat: https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636
EventSourceType source_type = 1;
string source_name = 2;
}
message EventSourceType {
EventSourceCode code = 1;
string name = 2;
}
enum EventSourceCode{
Manual = 0;
Automated = 1;
Propagated = 2;
}
// Indicates buffer state for job.
message JobRunBufferEvent {
string job_run_id = 1;
string job_label = 2;
repeated string servicing_want_ids = 3;
repeated string producing_partition_refs = 4;
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
}
// Just indicates that job has entered queue
message JobRunQueueEvent {
string job_run_id = 1;
}
// Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In
// future it will also be used to enable job re-entrance.
message JobRunHeartbeatEvent {
string job_run_id = 1;
// TODO reentrance?
}
// Simply indicates that the job has succeeded.
message JobRunSuccessEvent {
string job_run_id = 1;
}
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
message JobRunFailureEvent {
string job_run_id = 1;
}
// Job was explicitly canceled.
message JobRunCancelEvent {
string job_run_id = 1;
EventSource source = 2;
optional string comment = 3;
}
// Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated.
message JobRunMissingDepsEvent {
string job_run_id = 1;
repeated MissingDeps missing_deps = 2;
}
message MissingDeps {
// The list of partition refs that are prevented from building by these missing deps (can be just 1)
repeated PartitionRef impacted = 1;
repeated PartitionRef missing = 2;
}
message WantCreateEvent {
string want_id = 1;
@ -48,58 +98,51 @@ message WantCreateEvent {
uint64 data_timestamp = 5;
uint64 ttl_seconds = 6;
uint64 sla_seconds = 7;
WantSource source = 8;
EventSource source = 8;
optional string comment = 9;
}
message WantSource {
WantSourceType source_type = 1;
string source_name = 2;
}
message WantSourceType {
WantSourceCode code = 1;
string name = 2;
}
enum WantSourceCode{
Manual = 0;
Automated = 1;
Propagated = 2;
}
message WantCancelEvent {
string want_id = 1;
optional string reason = 2;
EventSource source = 2;
optional string comment = 3;
}
message TaintCreateEvent {
string taint_id = 1;
repeated PartitionRef partitions = 2;
optional string reason = 3;
string root_taint_id = 2;
string parent_taint_id = 3;
repeated PartitionRef partitions = 4;
EventSource source = 5;
optional string comment = 6;
}
message TaintDeleteEvent {
string taint_id = 1;
optional string reason = 2;
EventSource source = 2;
optional string comment = 3;
}
// Build State
// Represents the whole state of the system
message BuildState {
map<string, WantState> wants = 1;
map<string, PartitionState> partitions = 2;
map<string, TaintState> taints = 3;
map<string, WantDetail> wants = 1;
map<string, PartitionDetail> partitions = 2;
map<string, TaintDetail> taints = 3;
map<string, JobRunDetail> job_runs = 4;
}
message WantState {
message WantDetail {
string want_id = 1;
// TODO
}
message PartitionState {
message PartitionDetail {
// The partition reference
PartitionRef ref = 1;
// The partitions current status
PartitionStatus status = 2;
// The latest update to the partition's status
optional uint64 last_updated_at = 3;
optional uint64 last_updated_at = 3;
// IDs that associate the partition with other objects
repeated string job_run_ids = 4;
repeated string want_ids = 5;
@ -109,16 +152,22 @@ message PartitionStatus {
PartitionStatusCode code = 1;
string name = 2;
}
enum PartitionStatusCode{
enum PartitionStatusCode {
// TODO how do we avoid copying job states here?
Unknown = 0;
Wanted = 1;
Building = 2;
Live = 3;
Tainted = 4;
Failed = 4;
Tainted = 5;
}
message TaintState {
message TaintDetail {
// TODO
}
message JobRunDetail {
// TODO
}