implement job canceling

This commit is contained in:
Stuart Axelbrooke 2025-10-16 17:54:30 -07:00
parent 7debea96a2
commit cfcb201285
3 changed files with 72 additions and 17 deletions

View file

@ -197,7 +197,8 @@ enum JobRunStatusCode {
JobRunQueued = 0;
JobRunRunning = 1;
JobRunFailed = 2;
JobRunSucceeded = 3;
JobRunCanceled = 3;
JobRunSucceeded = 4;
}
message JobRunDetail {
string id = 1;

View file

@ -1,5 +1,5 @@
use crate::util::current_timestamp;
use crate::{JobRunStatus, JobRunStatusCode, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::{event_source, EventSource, JobRunStatus, JobRunStatusCode, ManuallyTriggeredEvent, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::data_build_event::Event;
impl From<&WantCreateEventV1> for WantDetail {
@ -68,3 +68,9 @@ impl From<JobRunStatusCode> for JobRunStatus {
}
}
}
impl From<ManuallyTriggeredEvent> for EventSource {
fn from(value: ManuallyTriggeredEvent) -> Self {
Self { source: Some(event_source::Source::ManuallyTriggered(value)) }
}
}

View file

@ -1,7 +1,7 @@
use std::collections::HashMap;
use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1};
use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource};
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio};
@ -16,7 +16,7 @@ pub trait JobRun {
Visit is responsible for observing the state of the job run,
*/
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>>;
fn cancel(&mut self) -> Result<(), Box<dyn Error>>;
fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>>;
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>>;
fn run(&mut self) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> {
self.run_with_env(None)
@ -43,6 +43,7 @@ pub struct SubProcessJobRun {
pub args: Vec<String>,
pub stdout_buffer: Vec<String>, // Buffered stdout lines
pub exit_code: Option<i32>, // Cached exit code
pub cancel_source: Option<EventSource>, // Source of cancellation if canceled
}
impl JobRun for SubProcessJobRun {
@ -70,12 +71,14 @@ impl JobRun for SubProcessJobRun {
if let Some(stdout) = child.stdout.take() {
let reader = BufReader::new(stdout);
for line in reader.lines() {
// TODO we should write lines to the job's file logs
if let Ok(line) = line {
self.stdout_buffer.push(line);
}
}
}
// Check exit status
match exit_status.code() {
Some(0) => {
@ -122,8 +125,27 @@ impl JobRun for SubProcessJobRun {
})
}
fn cancel(&mut self) -> Result<(), Box<dyn Error>> {
todo!()
fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>> {
if !self.running {
return Err("cancel() called on non-running job".into());
}
let child = self.process.as_mut()
.ok_or("cancel() called but no process present")?;
// Kill the process
child.kill()?;
// Wait for it to actually terminate
child.wait()?;
self.running = false;
Ok(JobRunCancelEventV1 {
job_run_id: self.job_run_id.to_string(),
source: Some(source),
comment: Some("Job was canceled".to_string()),
})
}
/// Mostly for test purposes
@ -151,6 +173,7 @@ impl SubProcessJobRun {
args,
stdout_buffer: Vec::new(),
exit_code: None,
cancel_source: None,
}))
}
}
@ -163,8 +186,9 @@ pub struct JobRunPollResult {
mod tests {
use std::collections::HashMap;
use crate::data_build_event::Event;
use crate::job_run::SubProcessJobRun;
use crate::JobRunStatusCode;
use crate::job_run::{JobRun, SubProcessJobRun};
use crate::{JobRunStatusCode, ManuallyTriggeredEvent};
use uuid::Uuid;
fn test_helper_path() -> String {
std::env::var("TEST_SRCDIR")
@ -208,13 +232,6 @@ mod tests {
}
}
/// Job that runs for more than 1 heartbeat interval should emit a JobRunHeartbeatEventV1 event
#[test]
#[ignore]
fn test_running_job_run_poll_returns_heartbeat() {
todo!()
}
/// Job run that fails should emit a JobRunFailureEventV1
#[test]
fn test_job_run_failure_returns_job_run_failure_event() {
@ -261,9 +278,40 @@ mod tests {
/// - Stop the actual subprocess (e.g. no output file should be written)
/// - Emitting a JobRunCancelEventV1 event
#[test]
#[ignore]
fn test_job_run_cancel_returns_job_run_cancel_event() {
todo!()
use std::fs;
use crate::ManuallyTriggeredEvent;
// Create a temp file path for the test
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap();
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()),
("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()),
("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()),
]);
job_run.run_with_env(Some(env)).unwrap();
// Give it a tiny bit of time to start
std::thread::sleep(std::time::Duration::from_millis(10));
// Cancel the job before it can complete - this returns the cancel event
let cancel_event = job_run.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap();
// Verify we got the cancel event
assert_eq!(cancel_event.job_run_id, job_run.id().to_string());
assert!(cancel_event.source.is_some());
assert_eq!(cancel_event.comment, Some("Job was canceled".to_string()));
// Verify the output file was NOT written (process was killed before it could complete)
assert!(!std::path::Path::new(&temp_file).exists(),
"Output file should not exist - process should have been killed");
// Cleanup just in case
let _ = fs::remove_file(&temp_file);
}
/// Job run that fails and emits a recognized "dep miss" statement should emit a JobRunMissingDepsEventV1