databuild/databuild/data_deps.rs

95 lines
3.4 KiB
Rust

use uuid::Uuid;
use crate::{event_source, EventSource, JobRunMissingDeps, JobTriggeredEvent, MissingDeps, WantAttributedPartitions, WantCreateEventV1, WantDetail};
use crate::data_build_event::Event;
use crate::event_source::Source;
// TODO - how do we version this?
pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub fn parse_log_line(line: &str) -> Option<JobRunMissingDeps> {
line_matches(line).and_then(json_to_missing_deps)
}
fn line_matches(line: &str) -> Option<&str> {
line.trim().strip_prefix(DATABUILD_JSON)
}
fn json_to_missing_deps(line: &str) -> Option<JobRunMissingDeps> {
serde_json::from_str(line).ok()
}
pub struct WantTimestamps {
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
}
impl From<WantDetail> for WantTimestamps {
fn from(want_detail: WantDetail) -> Self {
WantTimestamps {
data_timestamp: want_detail.data_timestamp,
ttl_seconds: want_detail.ttl_seconds,
sla_seconds: want_detail.sla_seconds,
}
}
}
impl WantTimestamps {
pub fn merge(self, other: WantTimestamps) -> WantTimestamps {
// TODO does this make sense?
WantTimestamps {
data_timestamp: self.data_timestamp.min(other.data_timestamp),
ttl_seconds: self.ttl_seconds.max(other.ttl_seconds),
sla_seconds: self.sla_seconds.max(other.sla_seconds),
}
}
}
pub fn missing_deps_to_want_events(
missing_deps: Vec<MissingDeps>,
job_run_id: &Uuid,
want_timestamps: WantTimestamps,
) -> Vec<Event> {
missing_deps.iter().map(|md| {
Event::WantCreateV1(WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: md.missing.clone(),
data_timestamp: want_timestamps.data_timestamp,
ttl_seconds: want_timestamps.ttl_seconds,
sla_seconds: want_timestamps.sla_seconds,
source: Some(JobTriggeredEvent {
job_run_id: job_run_id.to_string(),
}.into()),
comment: Some("Missing data".to_string()),
})
}).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_missing_deps_with_1_to_1_and_1_to_n() {
let log_line = r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string();
let result = parse_log_line(&log_line);
assert!(result.is_some());
let missing_deps = result.unwrap();
assert_eq!(missing_deps.missing_deps.len(), 2);
// First entry: 1:1 (one missing input -> one impacted output)
assert_eq!(missing_deps.missing_deps[0].impacted.len(), 1);
assert_eq!(missing_deps.missing_deps[0].impacted[0].r#ref, "output/p1");
assert_eq!(missing_deps.missing_deps[0].missing.len(), 1);
assert_eq!(missing_deps.missing_deps[0].missing[0].r#ref, "input/p1");
// Second entry: 1:N (one missing input -> multiple impacted outputs)
assert_eq!(missing_deps.missing_deps[1].impacted.len(), 2);
assert_eq!(missing_deps.missing_deps[1].impacted[0].r#ref, "output/p2");
assert_eq!(missing_deps.missing_deps[1].impacted[1].r#ref, "output/p3");
assert_eq!(missing_deps.missing_deps[1].missing.len(), 1);
assert_eq!(missing_deps.missing_deps[1].missing[0].r#ref, "input/p2");
}
}