diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 464343e..ad4741f 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,7 +1,5 @@ use crate::data_build_event::Event; -use crate::{ - BuildState, DataBuildEvent, EventFilter, WantState, -}; +use crate::{BuildState, DataBuildEvent, EventFilter, WantDetail}; use std::error::Error; use std::sync::{Arc, RwLock}; @@ -27,36 +25,34 @@ impl BuildEventLog { state: Arc::new(Default::default()), } } -} -impl BuildEventLog { fn append_event(&mut self, event: Event) -> Result> { - let idx = self.storage.append_event(event.clone())?; - self.reduce(event); + self.reduce(event.clone())?; + let idx = self.storage.append_event(event)?; Ok(idx) } - fn reduce(&mut self, event: Event) { + fn reduce(&mut self, event: Event) -> Result<(), Box> { + let mut state = self.state.write().expect("lock poisoned"); match event { Event::JobRunBuffer(e) => {} - Event::JobRunQueue(_) => {} - Event::JobRunStarted(_) => {} - Event::JobRunHeartbeat(_) => {} - Event::JobRunSuccess(_) => {} - Event::JobRunFailure(_) => {} - Event::JobRunCancel(_) => {} - Event::JobRunMissingDeps(_) => {} + Event::JobRunQueue(e) => {} + Event::JobRunHeartbeat(e) => {} + Event::JobRunSuccess(e) => {} + Event::JobRunFailure(e) => {} + Event::JobRunCancel(e) => {} + Event::JobRunMissingDeps(e) => {} Event::WantCreate(e) => { - self.state - .write() - .expect("couldn't take write lock") + state .wants - .insert(e.want_id.clone(), WantState { want_id: e.want_id }); + .insert(e.want_id.clone(), WantDetail { want_id: e.want_id }); } - Event::WantCancel(_) => {} - Event::TaintCreate(_) => {} - Event::TaintDelete(_) => {} + Event::WantCancel(e) => {} + Event::TaintCreate(e) => {} + Event::TaintDelete(e) => {} } + + Ok(()) } } @@ -80,8 +76,7 @@ mod tests { impl BELStorage for TestBELStorage { fn append_event(&mut self, event: Event) -> Result> { let now = SystemTime::now(); - let duration_since_epoch = now.duration_since(UNIX_EPOCH) - .expect("Time went backwards"); + let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); let timestamp = duration_since_epoch.as_nanos() as u64; diff --git a/databuild/databuild.proto b/databuild/databuild.proto index e6f499e..ee28454 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -16,29 +16,79 @@ message DataBuildEvent { // Job run events JobRunBufferEvent job_run_buffer = 3; JobRunQueueEvent job_run_queue = 4; - JobRunStartEvent job_run_started = 5; - JobRunHeartbeatEvent job_run_heartbeat = 6; - JobRunSuccessEvent job_run_success = 7; - JobRunFailureEvent job_run_failure = 8; - JobRunCancelEvent job_run_cancel = 9; - JobRunMissingDepsEvent job_run_missing_deps = 10; + JobRunHeartbeatEvent job_run_heartbeat = 5; + JobRunSuccessEvent job_run_success = 6; + JobRunFailureEvent job_run_failure = 7; + JobRunCancelEvent job_run_cancel = 8; + JobRunMissingDepsEvent job_run_missing_deps = 9; // Want events - WantCreateEvent want_create = 11; - WantCancelEvent want_cancel = 12; + WantCreateEvent want_create = 10; + WantCancelEvent want_cancel = 11; // Taint events - TaintCreateEvent taint_create = 13; - TaintDeleteEvent taint_delete = 14; + TaintCreateEvent taint_create = 12; + TaintDeleteEvent taint_delete = 13; } } -message JobRunBufferEvent {} -message JobRunQueueEvent {} -message JobRunStartEvent {} -message JobRunHeartbeatEvent {} -message JobRunSuccessEvent {} -message JobRunFailureEvent {} -message JobRunCancelEvent {} -message JobRunMissingDepsEvent {} +// Source metadata for user-driven events +message EventSource { + // Revisit - how do we model this? See this chat: https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636 + EventSourceType source_type = 1; + string source_name = 2; +} +message EventSourceType { + EventSourceCode code = 1; + string name = 2; +} +enum EventSourceCode{ + Manual = 0; + Automated = 1; + Propagated = 2; +} + +// Indicates buffer state for job. +message JobRunBufferEvent { + string job_run_id = 1; + string job_label = 2; + repeated string servicing_want_ids = 3; + repeated string producing_partition_refs = 4; + // TODO how do we handle buffer definition? Start simple, noop until we want something here? +} +// Just indicates that job has entered queue +message JobRunQueueEvent { + string job_run_id = 1; +} +// Emitted immediately on job spawn, and periodically by job to indicate job health when heartbeating is required. In +// future it will also be used to enable job re-entrance. +message JobRunHeartbeatEvent { + string job_run_id = 1; + // TODO reentrance? +} +// Simply indicates that the job has succeeded. +message JobRunSuccessEvent { + string job_run_id = 1; +} +// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry. +message JobRunFailureEvent { + string job_run_id = 1; +} +// Job was explicitly canceled. +message JobRunCancelEvent { + string job_run_id = 1; + EventSource source = 2; + optional string comment = 3; +} +// Job indicating that required deps are missing, listing upstreams -> impacted outputs so that wants can be propagated. +message JobRunMissingDepsEvent { + string job_run_id = 1; + repeated MissingDeps missing_deps = 2; +} +message MissingDeps { + // The list of partition refs that are prevented from building by these missing deps (can be just 1) + repeated PartitionRef impacted = 1; + repeated PartitionRef missing = 2; +} + message WantCreateEvent { string want_id = 1; @@ -48,58 +98,51 @@ message WantCreateEvent { uint64 data_timestamp = 5; uint64 ttl_seconds = 6; uint64 sla_seconds = 7; - WantSource source = 8; + EventSource source = 8; optional string comment = 9; } -message WantSource { - WantSourceType source_type = 1; - string source_name = 2; -} -message WantSourceType { - WantSourceCode code = 1; - string name = 2; -} -enum WantSourceCode{ - Manual = 0; - Automated = 1; - Propagated = 2; -} message WantCancelEvent { string want_id = 1; - optional string reason = 2; + EventSource source = 2; + optional string comment = 3; } message TaintCreateEvent { string taint_id = 1; - repeated PartitionRef partitions = 2; - optional string reason = 3; + string root_taint_id = 2; + string parent_taint_id = 3; + repeated PartitionRef partitions = 4; + EventSource source = 5; + optional string comment = 6; } message TaintDeleteEvent { string taint_id = 1; - optional string reason = 2; + EventSource source = 2; + optional string comment = 3; } // Build State // Represents the whole state of the system message BuildState { - map wants = 1; - map partitions = 2; - map taints = 3; + map wants = 1; + map partitions = 2; + map taints = 3; + map job_runs = 4; } -message WantState { +message WantDetail { string want_id = 1; // TODO } -message PartitionState { +message PartitionDetail { // The partition reference PartitionRef ref = 1; // The partitions current status PartitionStatus status = 2; // The latest update to the partition's status - optional uint64 last_updated_at = 3; + optional uint64 last_updated_at = 3; // IDs that associate the partition with other objects repeated string job_run_ids = 4; repeated string want_ids = 5; @@ -109,16 +152,22 @@ message PartitionStatus { PartitionStatusCode code = 1; string name = 2; } -enum PartitionStatusCode{ +enum PartitionStatusCode { + // TODO how do we avoid copying job states here? Unknown = 0; Wanted = 1; Building = 2; Live = 3; - Tainted = 4; + Failed = 4; + Tainted = 5; } -message TaintState { - +message TaintDetail { + // TODO +} + +message JobRunDetail { + // TODO }