diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index ad4741f..aad6fc9 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,5 +1,5 @@ use crate::data_build_event::Event; -use crate::{BuildState, DataBuildEvent, EventFilter, WantDetail}; +use crate::{BuildState, DataBuildEvent, WantDetail}; use std::error::Error; use std::sync::{Arc, RwLock}; @@ -8,7 +8,6 @@ trait BELStorage { fn list_events( &self, since_idx: u64, - filter: EventFilter, limit: u64, ) -> Result, Box>; } @@ -35,21 +34,21 @@ impl BuildEventLog { fn reduce(&mut self, event: Event) -> Result<(), Box> { let mut state = self.state.write().expect("lock poisoned"); match event { - Event::JobRunBuffer(e) => {} - Event::JobRunQueue(e) => {} - Event::JobRunHeartbeat(e) => {} - Event::JobRunSuccess(e) => {} - Event::JobRunFailure(e) => {} - Event::JobRunCancel(e) => {} - Event::JobRunMissingDeps(e) => {} - Event::WantCreate(e) => { + Event::JobRunBufferV1(e) => {} + Event::JobRunQueueV1(e) => {} + Event::JobRunHeartbeatV1(e) => {} + Event::JobRunSuccessV1(e) => {} + Event::JobRunFailureV1(e) => {} + Event::JobRunCancelV1(e) => {} + Event::JobRunMissingDepsV1(e) => {} + Event::WantCreateV1(e) => { state .wants .insert(e.want_id.clone(), WantDetail { want_id: e.want_id }); } - Event::WantCancel(e) => {} - Event::TaintCreate(e) => {} - Event::TaintDelete(e) => {} + Event::WantCancelV1(e) => {} + Event::TaintCreateV1(e) => {} + Event::TaintDeleteV1(e) => {} } Ok(()) @@ -59,7 +58,7 @@ impl BuildEventLog { mod tests { use crate::build_event_log::{BELStorage, BuildEventLog}; use crate::data_build_event::Event; - use crate::{DataBuildEvent, EventFilter, PartitionRef, WantCreateEvent}; + use crate::{DataBuildEvent, PartitionRef, WantCreateEventV1}; use std::error::Error; use std::time::{SystemTime, UNIX_EPOCH}; @@ -92,10 +91,11 @@ mod tests { fn list_events( &self, since_idx: u64, - filter: EventFilter, limit: u64, ) -> Result, Box> { - Ok(self.events.clone()) + Ok(self.events.iter().cloned().filter(|e| e.timestamp > since_idx) + .take(limit as usize) + .collect()) } } @@ -117,12 +117,12 @@ mod tests { } // Given - log.append_event(Event::WantCreate(WantCreateEvent { + log.append_event(Event::WantCreateV1(WantCreateEventV1 { want_id: want_id.clone(), root_want_id: "123".to_string(), parent_want_id: "123".to_string(), partitions: vec![PartitionRef { - r#ref: "".to_string(), + r#ref: "1234".to_string(), }], data_timestamp: 0, ttl_seconds: 1, diff --git a/databuild/databuild.proto b/databuild/databuild.proto index ee28454..649603b 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -14,19 +14,19 @@ message DataBuildEvent { uint64 event_id = 2; oneof event { // Job run events - JobRunBufferEvent job_run_buffer = 3; - JobRunQueueEvent job_run_queue = 4; - JobRunHeartbeatEvent job_run_heartbeat = 5; - JobRunSuccessEvent job_run_success = 6; - JobRunFailureEvent job_run_failure = 7; - JobRunCancelEvent job_run_cancel = 8; - JobRunMissingDepsEvent job_run_missing_deps = 9; + JobRunBufferEventV1 job_run_buffer_v1 = 10; + JobRunQueueEventV1 job_run_queue_v1 = 11; + JobRunHeartbeatEventV1 job_run_heartbeat_v1 = 12; + JobRunSuccessEventV1 job_run_success_v1 = 13; + JobRunFailureEventV1 job_run_failure_v1 = 14; + JobRunCancelEventV1 job_run_cancel_v1 = 15; + JobRunMissingDepsEventV1 job_run_missing_deps_v1 = 16; // Want events - WantCreateEvent want_create = 10; - WantCancelEvent want_cancel = 11; + WantCreateEventV1 want_create_v1 = 17; + WantCancelEventV1 want_cancel_v1 = 18; // Taint events - TaintCreateEvent taint_create = 12; - TaintDeleteEvent taint_delete = 13; + TaintCreateEventV1 taint_create_v1 = 19; + TaintDeleteEventV1 taint_delete_v1 = 20; } } @@ -47,7 +47,7 @@ enum EventSourceCode{ } // Indicates buffer state for job. -message JobRunBufferEvent { +message JobRunBufferEventV1 { string job_run_id = 1; string job_label = 2; repeated string servicing_want_ids = 3; @@ -55,31 +55,31 @@ message JobRunBufferEvent { // TODO how do we handle buffer definition? Start simple, noop until we want something here? } // Just indicates that job has entered queue -message JobRunQueueEvent { +message JobRunQueueEventV1 { string job_run_id = 1; } // Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In // future it will also be used to enable job re-entrance. -message JobRunHeartbeatEvent { +message JobRunHeartbeatEventV1 { string job_run_id = 1; // TODO reentrance? } // Simply indicates that the job has succeeded. -message JobRunSuccessEvent { +message JobRunSuccessEventV1 { string job_run_id = 1; } // Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry. -message JobRunFailureEvent { +message JobRunFailureEventV1 { string job_run_id = 1; } // Job was explicitly canceled. -message JobRunCancelEvent { +message JobRunCancelEventV1 { string job_run_id = 1; EventSource source = 2; optional string comment = 3; } // Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated. -message JobRunMissingDepsEvent { +message JobRunMissingDepsEventV1 { string job_run_id = 1; repeated MissingDeps missing_deps = 2; } @@ -90,7 +90,7 @@ message MissingDeps { } -message WantCreateEvent { +message WantCreateEventV1 { string want_id = 1; string root_want_id = 2; string parent_want_id = 3; @@ -101,13 +101,13 @@ message WantCreateEvent { EventSource source = 8; optional string comment = 9; } -message WantCancelEvent { +message WantCancelEventV1 { string want_id = 1; EventSource source = 2; optional string comment = 3; } -message TaintCreateEvent { +message TaintCreateEventV1 { string taint_id = 1; string root_taint_id = 2; string parent_taint_id = 3; @@ -115,7 +115,7 @@ message TaintCreateEvent { EventSource source = 5; optional string comment = 6; } -message TaintDeleteEvent { +message TaintDeleteEventV1 { string taint_id = 1; EventSource source = 2; optional string comment = 3; @@ -172,8 +172,6 @@ message JobRunDetail { message EventFilter { - repeated string partition_refs = 1; // Exact partition matches - repeated string partition_patterns = 2; // Glob patterns like "data/users/*" - repeated string job_labels = 3; // Job-specific events - repeated string job_run_ids = 4; // Job run events + // IDs of wants to get relevant events for + repeated string want_ids = 1; }