fix server panics related to not broadcasting derivative wants
Some checks are pending
/ setup (push) Waiting to run
Some checks are pending
/ setup (push) Waiting to run
This commit is contained in:
parent
895e499cc5
commit
b978be53f5
4 changed files with 131 additions and 30 deletions
|
|
@ -238,16 +238,34 @@ impl BELStorage for SqliteBELStorage {
|
||||||
pub struct BuildEventLog<S: BELStorage + Debug> {
|
pub struct BuildEventLog<S: BELStorage + Debug> {
|
||||||
pub storage: S,
|
pub storage: S,
|
||||||
pub state: BuildState,
|
pub state: BuildState,
|
||||||
|
/// Optional event broadcaster for HTTP server mirroring
|
||||||
|
#[cfg_attr(not(feature = "server"), allow(dead_code))]
|
||||||
|
pub event_broadcaster: Option<tokio::sync::broadcast::Sender<Event>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: BELStorage + Debug> BuildEventLog<S> {
|
impl<S: BELStorage + Debug> BuildEventLog<S> {
|
||||||
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
|
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
|
||||||
BuildEventLog { storage, state }
|
BuildEventLog {
|
||||||
|
storage,
|
||||||
|
state,
|
||||||
|
event_broadcaster: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_broadcaster(mut self, broadcaster: tokio::sync::broadcast::Sender<Event>) -> Self {
|
||||||
|
self.event_broadcaster = Some(broadcaster);
|
||||||
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
let events = self.state.handle_event(&event);
|
let events = self.state.handle_event(&event);
|
||||||
let idx = self.storage.append_event(event)?;
|
let idx = self.storage.append_event(event)?;
|
||||||
|
|
||||||
|
// Broadcast event to HTTP server (if configured)
|
||||||
|
if let Some(ref tx) = self.event_broadcaster {
|
||||||
|
let _ = tx.send(event.clone());
|
||||||
|
}
|
||||||
|
|
||||||
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
||||||
for event in events {
|
for event in events {
|
||||||
self.append_event(&event)?;
|
self.append_event(&event)?;
|
||||||
|
|
@ -327,6 +345,7 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
|
||||||
Self {
|
Self {
|
||||||
storage: self.storage.clone(),
|
storage: self.storage.clone(),
|
||||||
state: self.state.clone(),
|
state: self.state.clone(),
|
||||||
|
event_broadcaster: self.event_broadcaster.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -344,7 +363,11 @@ mod tests {
|
||||||
let storage =
|
let storage =
|
||||||
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||||
let state = BuildState::default();
|
let state = BuildState::default();
|
||||||
let mut log = BuildEventLog { storage, state };
|
let mut log = BuildEventLog {
|
||||||
|
storage,
|
||||||
|
state,
|
||||||
|
event_broadcaster: None,
|
||||||
|
};
|
||||||
|
|
||||||
let want_id = "sqlite_test_1234".to_string();
|
let want_id = "sqlite_test_1234".to_string();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -158,8 +158,15 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
|
||||||
let mut mirror_rx = event_tx.subscribe();
|
let mut mirror_rx = event_tx.subscribe();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Ok(event) = mirror_rx.recv().await {
|
while let Ok(event) = mirror_rx.recv().await {
|
||||||
let mut state = mirror_clone.write().unwrap();
|
match mirror_clone.write() {
|
||||||
state.handle_event(&event);
|
Ok(mut state) => {
|
||||||
|
state.handle_event(&event);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("State mirror task: RwLock poisoned, cannot update state: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -152,7 +152,19 @@ async fn list_wants(
|
||||||
Query(params): Query<ListWantsRequest>,
|
Query(params): Query<ListWantsRequest>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// Read from shared build state
|
// Read from shared build state
|
||||||
let build_state = state.build_state.read().unwrap();
|
let build_state = match state.build_state.read() {
|
||||||
|
Ok(state) => state,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(
|
||||||
|
"Internal server error: state lock poisoned",
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
let response = build_state.list_wants(¶ms);
|
let response = build_state.list_wants(¶ms);
|
||||||
|
|
||||||
(StatusCode::OK, Json(response)).into_response()
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
|
|
@ -161,7 +173,19 @@ async fn list_wants(
|
||||||
/// Get a specific want by ID
|
/// Get a specific want by ID
|
||||||
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
|
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
|
||||||
// Read from shared build state
|
// Read from shared build state
|
||||||
let build_state = state.build_state.read().unwrap();
|
let build_state = match state.build_state.read() {
|
||||||
|
Ok(state) => state,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(
|
||||||
|
"Internal server error: state lock poisoned",
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
let response = build_state.get_want(&want_id);
|
let response = build_state.get_want(&want_id);
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
|
|
@ -302,7 +326,19 @@ async fn list_partitions(
|
||||||
Query(params): Query<ListPartitionsRequest>,
|
Query(params): Query<ListPartitionsRequest>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// Read from shared build state
|
// Read from shared build state
|
||||||
let build_state = state.build_state.read().unwrap();
|
let build_state = match state.build_state.read() {
|
||||||
|
Ok(state) => state,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(
|
||||||
|
"Internal server error: state lock poisoned",
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
let response = build_state.list_partitions(¶ms);
|
let response = build_state.list_partitions(¶ms);
|
||||||
|
|
||||||
(StatusCode::OK, Json(response)).into_response()
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
|
|
@ -314,7 +350,19 @@ async fn list_job_runs(
|
||||||
Query(params): Query<ListJobRunsRequest>,
|
Query(params): Query<ListJobRunsRequest>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// Read from shared build state
|
// Read from shared build state
|
||||||
let build_state = state.build_state.read().unwrap();
|
let build_state = match state.build_state.read() {
|
||||||
|
Ok(state) => state,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(
|
||||||
|
"Internal server error: state lock poisoned",
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
let response = build_state.list_job_runs(¶ms);
|
let response = build_state.list_job_runs(¶ms);
|
||||||
|
|
||||||
(StatusCode::OK, Json(response)).into_response()
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
|
|
|
||||||
|
|
@ -164,7 +164,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
event_tx: broadcast::Sender<Event>,
|
event_tx: broadcast::Sender<Event>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bel: BuildEventLog::new(storage, Default::default()),
|
bel: BuildEventLog::new(storage, Default::default()).with_broadcaster(event_tx.clone()),
|
||||||
config,
|
config,
|
||||||
job_runs: Vec::new(),
|
job_runs: Vec::new(),
|
||||||
command_rx: Some(command_rx),
|
command_rx: Some(command_rx),
|
||||||
|
|
@ -173,14 +173,9 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append event to BEL and broadcast to HTTP server (if configured)
|
/// Append event to BEL (which handles state mutation, storage, broadcasting, and cascading events)
|
||||||
fn append_and_broadcast(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
fn append_and_broadcast(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||||
let idx = self.bel.append_event(event)?;
|
self.bel.append_event(event)
|
||||||
if let Some(tx) = &self.event_tx {
|
|
||||||
// Ignore send errors (no receivers is fine)
|
|
||||||
let _ = tx.send(event.clone());
|
|
||||||
}
|
|
||||||
Ok(idx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
||||||
|
|
@ -241,23 +236,32 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
let transitioned = match job {
|
let transitioned = match job {
|
||||||
JobRunHandle::Running(running) => match running.visit()? {
|
JobRunHandle::Running(running) => match running.visit()? {
|
||||||
VisitResult::StillRunning(still_running) => {
|
VisitResult::StillRunning(still_running) => {
|
||||||
println!("Still running job: {:?}", still_running.job_run_id);
|
tracing::debug!(
|
||||||
|
job_run_id = still_running.job_run_id.to_string(),
|
||||||
|
"Job still running"
|
||||||
|
);
|
||||||
JobRunHandle::Running(still_running)
|
JobRunHandle::Running(still_running)
|
||||||
}
|
}
|
||||||
VisitResult::Completed(completed) => {
|
VisitResult::Completed(completed) => {
|
||||||
println!("Completed job: {:?}", completed.job_run_id);
|
tracing::debug!(
|
||||||
|
job_run_id = completed.job_run_id.to_string(),
|
||||||
|
"Completed job"
|
||||||
|
);
|
||||||
let event = completed.state.to_event(&completed.job_run_id);
|
let event = completed.state.to_event(&completed.job_run_id);
|
||||||
events_to_append.push(event);
|
events_to_append.push(event);
|
||||||
JobRunHandle::Completed(completed)
|
JobRunHandle::Completed(completed)
|
||||||
}
|
}
|
||||||
VisitResult::Failed(failed) => {
|
VisitResult::Failed(failed) => {
|
||||||
println!("Failed job: {:?}", failed.job_run_id);
|
tracing::debug!(job_run_id = failed.job_run_id.to_string(), "Failed job");
|
||||||
let event = failed.state.to_event(&failed.job_run_id);
|
let event = failed.state.to_event(&failed.job_run_id);
|
||||||
events_to_append.push(event);
|
events_to_append.push(event);
|
||||||
JobRunHandle::Failed(failed)
|
JobRunHandle::Failed(failed)
|
||||||
}
|
}
|
||||||
VisitResult::DepMiss(dep_miss) => {
|
VisitResult::DepMiss(dep_miss) => {
|
||||||
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
tracing::debug!(
|
||||||
|
job_run_id = dep_miss.job_run_id.to_string(),
|
||||||
|
"Job dep miss"
|
||||||
|
);
|
||||||
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
|
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
|
||||||
events_to_append.push(event);
|
events_to_append.push(event);
|
||||||
JobRunHandle::DepMiss(dep_miss)
|
JobRunHandle::DepMiss(dep_miss)
|
||||||
|
|
@ -357,18 +361,37 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
|
|
||||||
/// Process all pending commands from the HTTP server
|
/// Process all pending commands from the HTTP server
|
||||||
fn process_commands(&mut self) -> Result<(), DatabuildError> {
|
fn process_commands(&mut self) -> Result<(), DatabuildError> {
|
||||||
|
// TODO Handle command failure gracefully - what's story there?
|
||||||
|
// Collect all pending commands first (to avoid borrow checker issues)
|
||||||
|
let mut commands = Vec::new();
|
||||||
if let Some(ref mut rx) = self.command_rx {
|
if let Some(ref mut rx) = self.command_rx {
|
||||||
// Process all pending commands (non-blocking)
|
|
||||||
while let Ok(cmd) = rx.try_recv() {
|
while let Ok(cmd) = rx.try_recv() {
|
||||||
match cmd {
|
commands.push(cmd);
|
||||||
Command::CreateWant { request, reply } => {
|
}
|
||||||
let response = self.bel.api_handle_want_create(request);
|
}
|
||||||
let _ = reply.send(response); // Ignore send errors
|
|
||||||
}
|
// Now process all collected commands
|
||||||
Command::CancelWant { request, reply } => {
|
for cmd in commands {
|
||||||
let response = self.bel.api_handle_want_cancel(request);
|
match cmd {
|
||||||
let _ = reply.send(response); // Ignore send errors
|
Command::CreateWant { request, reply } => {
|
||||||
}
|
// Convert request to event and broadcast it (so HTTP server receives it)
|
||||||
|
let event: crate::WantCreateEventV1 = request.into();
|
||||||
|
let result = self
|
||||||
|
.append_and_broadcast(&crate::data_build_event::Event::WantCreateV1(
|
||||||
|
event.clone(),
|
||||||
|
))
|
||||||
|
.map(|_| self.bel.state.get_want(&event.want_id).into());
|
||||||
|
let _ = reply.send(result); // Ignore send errors
|
||||||
|
}
|
||||||
|
Command::CancelWant { request, reply } => {
|
||||||
|
// Convert request to event and broadcast it (so HTTP server receives it)
|
||||||
|
let event: crate::WantCancelEventV1 = request.into();
|
||||||
|
let result = self
|
||||||
|
.append_and_broadcast(&crate::data_build_event::Event::WantCancelV1(
|
||||||
|
event.clone(),
|
||||||
|
))
|
||||||
|
.map(|_| self.bel.state.get_want(&event.want_id).into());
|
||||||
|
let _ = reply.send(result); // Ignore send errors
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue