Remove event filter and add event versioning

This commit is contained in:
Stuart Axelbrooke 2025-09-13 17:51:16 -07:00
parent f7ac3c077e
commit 2009ac1c12
2 changed files with 42 additions and 44 deletions

View file

@ -1,5 +1,5 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{BuildState, DataBuildEvent, EventFilter, WantDetail}; use crate::{BuildState, DataBuildEvent, WantDetail};
use std::error::Error; use std::error::Error;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -8,7 +8,6 @@ trait BELStorage {
fn list_events( fn list_events(
&self, &self,
since_idx: u64, since_idx: u64,
filter: EventFilter,
limit: u64, limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>; ) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
} }
@ -35,21 +34,21 @@ impl<B: BELStorage> BuildEventLog<B> {
fn reduce(&mut self, event: Event) -> Result<(), Box<dyn Error>> { fn reduce(&mut self, event: Event) -> Result<(), Box<dyn Error>> {
let mut state = self.state.write().expect("lock poisoned"); let mut state = self.state.write().expect("lock poisoned");
match event { match event {
Event::JobRunBuffer(e) => {} Event::JobRunBufferV1(e) => {}
Event::JobRunQueue(e) => {} Event::JobRunQueueV1(e) => {}
Event::JobRunHeartbeat(e) => {} Event::JobRunHeartbeatV1(e) => {}
Event::JobRunSuccess(e) => {} Event::JobRunSuccessV1(e) => {}
Event::JobRunFailure(e) => {} Event::JobRunFailureV1(e) => {}
Event::JobRunCancel(e) => {} Event::JobRunCancelV1(e) => {}
Event::JobRunMissingDeps(e) => {} Event::JobRunMissingDepsV1(e) => {}
Event::WantCreate(e) => { Event::WantCreateV1(e) => {
state state
.wants .wants
.insert(e.want_id.clone(), WantDetail { want_id: e.want_id }); .insert(e.want_id.clone(), WantDetail { want_id: e.want_id });
} }
Event::WantCancel(e) => {} Event::WantCancelV1(e) => {}
Event::TaintCreate(e) => {} Event::TaintCreateV1(e) => {}
Event::TaintDelete(e) => {} Event::TaintDeleteV1(e) => {}
} }
Ok(()) Ok(())
@ -59,7 +58,7 @@ impl<B: BELStorage> BuildEventLog<B> {
mod tests { mod tests {
use crate::build_event_log::{BELStorage, BuildEventLog}; use crate::build_event_log::{BELStorage, BuildEventLog};
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{DataBuildEvent, EventFilter, PartitionRef, WantCreateEvent}; use crate::{DataBuildEvent, PartitionRef, WantCreateEventV1};
use std::error::Error; use std::error::Error;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
@ -92,10 +91,11 @@ mod tests {
fn list_events( fn list_events(
&self, &self,
since_idx: u64, since_idx: u64,
filter: EventFilter,
limit: u64, limit: u64,
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> { ) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
Ok(self.events.clone()) Ok(self.events.iter().cloned().filter(|e| e.timestamp > since_idx)
.take(limit as usize)
.collect())
} }
} }
@ -117,12 +117,12 @@ mod tests {
} }
// Given // Given
log.append_event(Event::WantCreate(WantCreateEvent { log.append_event(Event::WantCreateV1(WantCreateEventV1 {
want_id: want_id.clone(), want_id: want_id.clone(),
root_want_id: "123".to_string(), root_want_id: "123".to_string(),
parent_want_id: "123".to_string(), parent_want_id: "123".to_string(),
partitions: vec![PartitionRef { partitions: vec![PartitionRef {
r#ref: "".to_string(), r#ref: "1234".to_string(),
}], }],
data_timestamp: 0, data_timestamp: 0,
ttl_seconds: 1, ttl_seconds: 1,

View file

@ -14,19 +14,19 @@ message DataBuildEvent {
uint64 event_id = 2; uint64 event_id = 2;
oneof event { oneof event {
// Job run events // Job run events
JobRunBufferEvent job_run_buffer = 3; JobRunBufferEventV1 job_run_buffer_v1 = 10;
JobRunQueueEvent job_run_queue = 4; JobRunQueueEventV1 job_run_queue_v1 = 11;
JobRunHeartbeatEvent job_run_heartbeat = 5; JobRunHeartbeatEventV1 job_run_heartbeat_v1 = 12;
JobRunSuccessEvent job_run_success = 6; JobRunSuccessEventV1 job_run_success_v1 = 13;
JobRunFailureEvent job_run_failure = 7; JobRunFailureEventV1 job_run_failure_v1 = 14;
JobRunCancelEvent job_run_cancel = 8; JobRunCancelEventV1 job_run_cancel_v1 = 15;
JobRunMissingDepsEvent job_run_missing_deps = 9; JobRunMissingDepsEventV1 job_run_missing_deps_v1 = 16;
// Want events // Want events
WantCreateEvent want_create = 10; WantCreateEventV1 want_create_v1 = 17;
WantCancelEvent want_cancel = 11; WantCancelEventV1 want_cancel_v1 = 18;
// Taint events // Taint events
TaintCreateEvent taint_create = 12; TaintCreateEventV1 taint_create_v1 = 19;
TaintDeleteEvent taint_delete = 13; TaintDeleteEventV1 taint_delete_v1 = 20;
} }
} }
@ -47,7 +47,7 @@ enum EventSourceCode{
} }
// Indicates buffer state for job. // Indicates buffer state for job.
message JobRunBufferEvent { message JobRunBufferEventV1 {
string job_run_id = 1; string job_run_id = 1;
string job_label = 2; string job_label = 2;
repeated string servicing_want_ids = 3; repeated string servicing_want_ids = 3;
@ -55,31 +55,31 @@ message JobRunBufferEvent {
// TODO how do we handle buffer definition? Start simple, noop until we want something here? // TODO how do we handle buffer definition? Start simple, noop until we want something here?
} }
// Just indicates that job has entered queue // Just indicates that job has entered queue
message JobRunQueueEvent { message JobRunQueueEventV1 {
string job_run_id = 1; string job_run_id = 1;
} }
// Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In // Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In
// future it will also be used to enable job re-entrance. // future it will also be used to enable job re-entrance.
message JobRunHeartbeatEvent { message JobRunHeartbeatEventV1 {
string job_run_id = 1; string job_run_id = 1;
// TODO reentrance? // TODO reentrance?
} }
// Simply indicates that the job has succeeded. // Simply indicates that the job has succeeded.
message JobRunSuccessEvent { message JobRunSuccessEventV1 {
string job_run_id = 1; string job_run_id = 1;
} }
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry. // Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
message JobRunFailureEvent { message JobRunFailureEventV1 {
string job_run_id = 1; string job_run_id = 1;
} }
// Job was explicitly canceled. // Job was explicitly canceled.
message JobRunCancelEvent { message JobRunCancelEventV1 {
string job_run_id = 1; string job_run_id = 1;
EventSource source = 2; EventSource source = 2;
optional string comment = 3; optional string comment = 3;
} }
// Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated. // Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated.
message JobRunMissingDepsEvent { message JobRunMissingDepsEventV1 {
string job_run_id = 1; string job_run_id = 1;
repeated MissingDeps missing_deps = 2; repeated MissingDeps missing_deps = 2;
} }
@ -90,7 +90,7 @@ message MissingDeps {
} }
message WantCreateEvent { message WantCreateEventV1 {
string want_id = 1; string want_id = 1;
string root_want_id = 2; string root_want_id = 2;
string parent_want_id = 3; string parent_want_id = 3;
@ -101,13 +101,13 @@ message WantCreateEvent {
EventSource source = 8; EventSource source = 8;
optional string comment = 9; optional string comment = 9;
} }
message WantCancelEvent { message WantCancelEventV1 {
string want_id = 1; string want_id = 1;
EventSource source = 2; EventSource source = 2;
optional string comment = 3; optional string comment = 3;
} }
message TaintCreateEvent { message TaintCreateEventV1 {
string taint_id = 1; string taint_id = 1;
string root_taint_id = 2; string root_taint_id = 2;
string parent_taint_id = 3; string parent_taint_id = 3;
@ -115,7 +115,7 @@ message TaintCreateEvent {
EventSource source = 5; EventSource source = 5;
optional string comment = 6; optional string comment = 6;
} }
message TaintDeleteEvent { message TaintDeleteEventV1 {
string taint_id = 1; string taint_id = 1;
EventSource source = 2; EventSource source = 2;
optional string comment = 3; optional string comment = 3;
@ -172,8 +172,6 @@ message JobRunDetail {
message EventFilter { message EventFilter {
repeated string partition_refs = 1; // Exact partition matches // IDs of wants to get relevant events for
repeated string partition_patterns = 2; // Glob patterns like "data/users/*" repeated string want_ids = 1;
repeated string job_labels = 3; // Job-specific events
repeated string job_run_ids = 4; // Job run events
} }