From cfcb201285b2334ae500c004f867bf8c7902c32c Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Thu, 16 Oct 2025 17:54:30 -0700 Subject: [PATCH] implement job canceling --- databuild/databuild.proto | 3 +- databuild/event_transforms.rs | 8 +++- databuild/job_run.rs | 78 ++++++++++++++++++++++++++++------- 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/databuild/databuild.proto b/databuild/databuild.proto index a1513e5..2dc8353 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -197,7 +197,8 @@ enum JobRunStatusCode { JobRunQueued = 0; JobRunRunning = 1; JobRunFailed = 2; - JobRunSucceeded = 3; + JobRunCanceled = 3; + JobRunSucceeded = 4; } message JobRunDetail { string id = 1; diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 5d37a40..59d34dc 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,5 +1,5 @@ use crate::util::current_timestamp; -use crate::{JobRunStatus, JobRunStatusCode, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; use crate::data_build_event::Event; impl From<&WantCreateEventV1> for WantDetail { @@ -68,3 +68,9 @@ impl From for JobRunStatus { } } } + +impl From for EventSource { + fn from(value: ManuallyTriggeredEvent) -> Self { + Self { source: Some(event_source::Source::ManuallyTriggered(value)) } + } +} diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 5aed740..f93cf6a 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; -use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1}; +use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource}; use std::error::Error; use std::io::{BufRead, BufReader}; use std::process::{Child, Command, Stdio}; @@ -16,7 +16,7 @@ pub trait JobRun { Visit is responsible for observing the state of the job run, */ fn visit(&mut self) -> Result>; - fn cancel(&mut self) -> Result<(), Box>; + fn cancel(&mut self, source: EventSource) -> Result>; fn run_with_env(&mut self, env: Option>) -> Result>; fn run(&mut self) -> Result> { self.run_with_env(None) @@ -43,6 +43,7 @@ pub struct SubProcessJobRun { pub args: Vec, pub stdout_buffer: Vec, // Buffered stdout lines pub exit_code: Option, // Cached exit code + pub cancel_source: Option, // Source of cancellation if canceled } impl JobRun for SubProcessJobRun { @@ -70,12 +71,14 @@ impl JobRun for SubProcessJobRun { if let Some(stdout) = child.stdout.take() { let reader = BufReader::new(stdout); for line in reader.lines() { + // TODO we should write lines to the job's file logs if let Ok(line) = line { self.stdout_buffer.push(line); } } } + // Check exit status match exit_status.code() { Some(0) => { @@ -122,8 +125,27 @@ impl JobRun for SubProcessJobRun { }) } - fn cancel(&mut self) -> Result<(), Box> { - todo!() + fn cancel(&mut self, source: EventSource) -> Result> { + if !self.running { + return Err("cancel() called on non-running job".into()); + } + + let child = self.process.as_mut() + .ok_or("cancel() called but no process present")?; + + // Kill the process + child.kill()?; + + // Wait for it to actually terminate + child.wait()?; + + self.running = false; + + Ok(JobRunCancelEventV1 { + job_run_id: self.job_run_id.to_string(), + source: Some(source), + comment: Some("Job was canceled".to_string()), + }) } /// Mostly for test purposes @@ -151,6 +173,7 @@ impl SubProcessJobRun { args, stdout_buffer: Vec::new(), exit_code: None, + cancel_source: None, })) } } @@ -163,8 +186,9 @@ pub struct JobRunPollResult { mod tests { use std::collections::HashMap; use crate::data_build_event::Event; - use crate::job_run::SubProcessJobRun; - use crate::JobRunStatusCode; + use crate::job_run::{JobRun, SubProcessJobRun}; + use crate::{JobRunStatusCode, ManuallyTriggeredEvent}; + use uuid::Uuid; fn test_helper_path() -> String { std::env::var("TEST_SRCDIR") @@ -208,13 +232,6 @@ mod tests { } } - /// Job that runs for more than 1 heartbeat interval should emit a JobRunHeartbeatEventV1 event - #[test] - #[ignore] - fn test_running_job_run_poll_returns_heartbeat() { - todo!() - } - /// Job run that fails should emit a JobRunFailureEventV1 #[test] fn test_job_run_failure_returns_job_run_failure_event() { @@ -261,9 +278,40 @@ mod tests { /// - Stop the actual subprocess (e.g. no output file should be written) /// - Emitting a JobRunCancelEventV1 event #[test] - #[ignore] fn test_job_run_cancel_returns_job_run_cancel_event() { - todo!() + use std::fs; + use crate::ManuallyTriggeredEvent; + + // Create a temp file path for the test + let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); + + // Spawn a job run that will sleep for 1 second and write a file + let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); + + let env: HashMap = HashMap::from([ + ("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()), + ("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()), + ("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()), + ]); + job_run.run_with_env(Some(env)).unwrap(); + + // Give it a tiny bit of time to start + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Cancel the job before it can complete - this returns the cancel event + let cancel_event = job_run.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap(); + + // Verify we got the cancel event + assert_eq!(cancel_event.job_run_id, job_run.id().to_string()); + assert!(cancel_event.source.is_some()); + assert_eq!(cancel_event.comment, Some("Job was canceled".to_string())); + + // Verify the output file was NOT written (process was killed before it could complete) + assert!(!std::path::Path::new(&temp_file).exists(), + "Output file should not exist - process should have been killed"); + + // Cleanup just in case + let _ = fs::remove_file(&temp_file); } /// Job run that fails and emits a recognized "dep miss" statement should emit a JobRunMissingDepsEventV1