diff --git a/databuild/data_deps.rs b/databuild/data_deps.rs index 69f92bb..2053c81 100644 --- a/databuild/data_deps.rs +++ b/databuild/data_deps.rs @@ -1,4 +1,7 @@ -use crate::JobRunMissingDeps; +use uuid::Uuid; +use crate::{event_source, EventSource, JobRunMissingDeps, JobTriggeredEvent, MissingDeps, WantAttributedPartitions, WantCreateEventV1, WantDetail}; +use crate::data_build_event::Event; +use crate::event_source::Source; // TODO - how do we version this? pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; @@ -15,6 +18,53 @@ fn json_to_missing_deps(line: &str) -> Option { serde_json::from_str(line).ok() } +pub struct WantTimestamps { + data_timestamp: u64, + ttl_seconds: u64, + sla_seconds: u64, +} + +impl From for WantTimestamps { + fn from(want_detail: WantDetail) -> Self { + WantTimestamps { + data_timestamp: want_detail.data_timestamp, + ttl_seconds: want_detail.ttl_seconds, + sla_seconds: want_detail.sla_seconds, + } + } +} + +impl WantTimestamps { + pub fn merge(self, other: WantTimestamps) -> WantTimestamps { + // TODO does this make sense? + WantTimestamps { + data_timestamp: self.data_timestamp.min(other.data_timestamp), + ttl_seconds: self.ttl_seconds.max(other.ttl_seconds), + sla_seconds: self.sla_seconds.max(other.sla_seconds), + } + } +} + +pub fn missing_deps_to_want_events( + missing_deps: Vec, + job_run_id: &Uuid, + want_timestamps: WantTimestamps, +) -> Vec { + missing_deps.iter().map(|md| { + Event::WantCreateV1(WantCreateEventV1 { + want_id: Uuid::new_v4().into(), + partitions: md.missing.clone(), + data_timestamp: want_timestamps.data_timestamp, + ttl_seconds: want_timestamps.ttl_seconds, + sla_seconds: want_timestamps.sla_seconds, + source: Some(JobTriggeredEvent { + job_run_id: job_run_id.to_string(), + }.into()), + comment: Some("Missing data".to_string()), + }) + }).collect() +} + #[cfg(test)] mod tests { use super::*; diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index d3bfd1b..515875c 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,6 +1,6 @@ use crate::data_build_event::Event; use crate::util::current_timestamp; -use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; impl From<&WantCreateEventV1> for WantDetail { fn from(e: &WantCreateEventV1) -> Self { @@ -74,3 +74,9 @@ impl From for EventSource { Self { source: Some(event_source::Source::ManuallyTriggered(value)) } } } + +impl From for EventSource { + fn from(value: JobTriggeredEvent) -> Self { + Self { source: Some(event_source::Source::JobTriggered(value)) } + } +} diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 4dfb4d5..3ce693a 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,9 +1,6 @@ use crate::data_build_event::Event; use crate::data_deps::parse_log_line; -use crate::{ - EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, - JobRunSuccessEventV1, MissingDeps, -}; +use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDepsEventV1, JobRunStatus, JobRunSuccessEventV1, MissingDeps}; use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader}; @@ -348,6 +345,15 @@ impl SubProcessCanceled { } } +impl SubProcessDepMiss { + pub fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 { + job_run_id: job_run_id.to_string(), + missing_deps: self.missing_deps.clone(), + }) + } +} + // Old JobRunPollResult structure - kept for compatibility during migration pub struct JobRunPollResult { pub new_events: Vec, diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index ac564df..7e5287c 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,6 +1,11 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; +use crate::data_build_event::Event; +use crate::data_deps::{missing_deps_to_want_events, WantTimestamps}; use crate::job::JobConfiguration; -use crate::job_run::{NotStartedJobRun, RunningJobRun, CompletedJobRun, FailedJobRun, SubProcessBackend}; +use crate::job_run::{ + CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun, + SubProcessBackend, +}; use crate::{PartitionRef, WantDetail}; use std::collections::HashMap; use std::error::Error; @@ -17,6 +22,7 @@ struct Orchestrator { running_jobs: Vec>, completed_jobs: Vec>, failed_jobs: Vec>, + dep_miss_jobs: Vec>, config: OrchestratorConfig, } @@ -28,6 +34,7 @@ impl Default for Orchestrator { running_jobs: Default::default(), completed_jobs: Default::default(), failed_jobs: Default::default(), + dep_miss_jobs: Default::default(), config: Default::default(), } } @@ -41,6 +48,7 @@ impl Orchestrator { running_jobs: Default::default(), completed_jobs: Default::default(), failed_jobs: Default::default(), + dep_miss_jobs: Default::default(), config: self.config.clone(), } } @@ -114,6 +122,7 @@ impl Orchestrator { running_jobs: Vec::new(), completed_jobs: Vec::new(), failed_jobs: Vec::new(), + dep_miss_jobs: Vec::new(), config, } } @@ -137,18 +146,44 @@ impl Orchestrator { } JobRunVisitResult::Completed(completed) => { // Emit success event - let event = completed.state.to_event(&completed.id()); + let event: Event = completed.state.to_event(&completed.id()); self.bel.append_event(&event)?; self.completed_jobs.push(completed); } JobRunVisitResult::Failed(failed) => { // Emit failure event - let event = failed.state.to_event(&failed.id()); + let event: Event = failed.state.to_event(&failed.id()); self.bel.append_event(&event)?; self.failed_jobs.push(failed); } JobRunVisitResult::DepMiss(dep_miss) => { - todo!(); + // Append literal dep miss + let event: Event = dep_miss.state.to_event(&dep_miss.id()); + self.bel.append_event(&event)?; + // Append wants from dep miss + let job_run_detail = self + .bel + .state + .get_job_run(&dep_miss.job_run_id.to_string()) + .ok_or(format!( + "Unable to find job run with id `{}`", + dep_miss.job_run_id + ))?; + let want_timestamps: WantTimestamps = job_run_detail + .servicing_wants + .iter() + .flat_map(|wap| self.bel.state.get_want(&wap.want_id).map(|w| w.into())) + .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) + .ok_or(format!("No wants found"))?; + let want_events = missing_deps_to_want_events( + dep_miss.state.missing_deps.clone(), + &dep_miss.job_run_id, + want_timestamps, + ); + for event in want_events { + self.bel.append_event(&event)?; + } + self.dep_miss_jobs.push(dep_miss); } } }