implemented api phase 6
This commit is contained in:
parent
f14d93da7a
commit
556ccb8a4b
2 changed files with 118 additions and 49 deletions
|
|
@ -36,6 +36,7 @@ rust_library(
|
||||||
"@crates//:tokio",
|
"@crates//:tokio",
|
||||||
"@crates//:tower",
|
"@crates//:tower",
|
||||||
"@crates//:tower-http",
|
"@crates//:tower-http",
|
||||||
|
"@crates//:tracing",
|
||||||
"@crates//:uuid",
|
"@crates//:uuid",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ use crate::{
|
||||||
use axum::{
|
use axum::{
|
||||||
Json, Router,
|
Json, Router,
|
||||||
extract::{Path, Query, Request, State},
|
extract::{Path, Query, Request, State},
|
||||||
http::StatusCode,
|
http::{HeaderValue, Method, StatusCode},
|
||||||
middleware::{self, Next},
|
middleware::{self, Next},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
routing::{delete, get, post},
|
routing::{delete, get, post},
|
||||||
|
|
@ -20,6 +20,7 @@ use std::sync::{
|
||||||
};
|
};
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
use tower_http::cors::CorsLayer;
|
||||||
|
|
||||||
/// Shared application state for HTTP handlers
|
/// Shared application state for HTTP handlers
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -73,6 +74,15 @@ async fn update_last_request_time(
|
||||||
|
|
||||||
/// Create the Axum router with all endpoints
|
/// Create the Axum router with all endpoints
|
||||||
pub fn create_router(state: AppState) -> Router {
|
pub fn create_router(state: AppState) -> Router {
|
||||||
|
// Configure CORS for web app development
|
||||||
|
let cors = CorsLayer::new()
|
||||||
|
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
|
||||||
|
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
|
||||||
|
.allow_headers([
|
||||||
|
axum::http::header::CONTENT_TYPE,
|
||||||
|
axum::http::header::AUTHORIZATION,
|
||||||
|
]);
|
||||||
|
|
||||||
Router::new()
|
Router::new()
|
||||||
// Health check
|
// Health check
|
||||||
.route("/health", get(health))
|
.route("/health", get(health))
|
||||||
|
|
@ -85,6 +95,8 @@ pub fn create_router(state: AppState) -> Router {
|
||||||
.route("/api/partitions", get(list_partitions))
|
.route("/api/partitions", get(list_partitions))
|
||||||
// Job run endpoints
|
// Job run endpoints
|
||||||
.route("/api/job_runs", get(list_job_runs))
|
.route("/api/job_runs", get(list_job_runs))
|
||||||
|
// Add CORS middleware
|
||||||
|
.layer(cors)
|
||||||
// Add middleware to track request time
|
// Add middleware to track request time
|
||||||
.layer(middleware::from_fn_with_state(
|
.layer(middleware::from_fn_with_state(
|
||||||
state.clone(),
|
state.clone(),
|
||||||
|
|
@ -93,6 +105,34 @@ pub fn create_router(state: AppState) -> Router {
|
||||||
.with_state(state)
|
.with_state(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Error Handling
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Standard error response structure
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct ErrorResponse {
|
||||||
|
error: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
details: Option<serde_json::Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ErrorResponse {
|
||||||
|
fn new(error: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
error: error.into(),
|
||||||
|
details: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_details(error: impl Into<String>, details: serde_json::Value) -> Self {
|
||||||
|
Self {
|
||||||
|
error: error.into(),
|
||||||
|
details: Some(details),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Handlers
|
// Handlers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
@ -111,11 +151,10 @@ async fn list_wants(
|
||||||
let events = match state.bel_storage.list_events(0, 100000) {
|
let events = match state.bel_storage.list_events(0, 100000) {
|
||||||
Ok(events) => events,
|
Ok(events) => events,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to read events from BEL storage: {}", e);
|
||||||
return (
|
return (
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
Json(serde_json::json!({
|
Json(ErrorResponse::new(format!("Failed to read events: {}", e))),
|
||||||
"error": format!("Failed to read events: {}", e)
|
|
||||||
})),
|
|
||||||
)
|
)
|
||||||
.into_response();
|
.into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -135,11 +174,10 @@ async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) ->
|
||||||
let events = match state.bel_storage.list_events(0, 100000) {
|
let events = match state.bel_storage.list_events(0, 100000) {
|
||||||
Ok(events) => events,
|
Ok(events) => events,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to read events from BEL storage: {}", e);
|
||||||
return (
|
return (
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
Json(serde_json::json!({
|
Json(ErrorResponse::new(format!("Failed to read events: {}", e))),
|
||||||
"error": format!("Failed to read events: {}", e)
|
|
||||||
})),
|
|
||||||
)
|
)
|
||||||
.into_response();
|
.into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -147,16 +185,24 @@ async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) ->
|
||||||
|
|
||||||
let build_state = BuildState::from_events(&events);
|
let build_state = BuildState::from_events(&events);
|
||||||
|
|
||||||
let req = GetWantRequest { want_id };
|
let req = GetWantRequest {
|
||||||
|
want_id: want_id.clone(),
|
||||||
|
};
|
||||||
let response = build_state.get_want(&req.want_id);
|
let response = build_state.get_want(&req.want_id);
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
|
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
|
||||||
None => (
|
None => {
|
||||||
StatusCode::NOT_FOUND,
|
tracing::debug!("Want not found: {}", want_id);
|
||||||
Json(serde_json::json!({"error": "Want not found"})),
|
(
|
||||||
)
|
StatusCode::NOT_FOUND,
|
||||||
.into_response(),
|
Json(ErrorResponse::with_details(
|
||||||
|
"Want not found",
|
||||||
|
serde_json::json!({"want_id": want_id}),
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -186,21 +232,33 @@ async fn create_want(
|
||||||
|
|
||||||
// Wait for orchestrator reply
|
// Wait for orchestrator reply
|
||||||
match reply_rx.await {
|
match reply_rx.await {
|
||||||
Ok(Ok(response)) => (StatusCode::OK, Json(response)).into_response(),
|
Ok(Ok(response)) => {
|
||||||
Ok(Err(e)) => (
|
tracing::info!(
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
"Created want: {}",
|
||||||
Json(serde_json::json!({
|
response
|
||||||
"error": format!("Failed to create want: {}", e)
|
.data
|
||||||
})),
|
.as_ref()
|
||||||
)
|
.map(|w| &w.want_id)
|
||||||
.into_response(),
|
.unwrap_or(&"unknown".to_string())
|
||||||
Err(_) => (
|
);
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
Json(serde_json::json!({
|
}
|
||||||
"error": "Orchestrator did not respond"
|
Ok(Err(e)) => {
|
||||||
})),
|
tracing::error!("Failed to create want: {}", e);
|
||||||
)
|
(
|
||||||
.into_response(),
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(format!("Failed to create want: {}", e))),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tracing::error!("Orchestrator did not respond to create want command");
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new("Orchestrator did not respond")),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -234,21 +292,33 @@ async fn cancel_want(
|
||||||
|
|
||||||
// Wait for orchestrator reply
|
// Wait for orchestrator reply
|
||||||
match reply_rx.await {
|
match reply_rx.await {
|
||||||
Ok(Ok(response)) => (StatusCode::OK, Json(response)).into_response(),
|
Ok(Ok(response)) => {
|
||||||
Ok(Err(e)) => (
|
tracing::info!(
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
"Cancelled want: {}",
|
||||||
Json(serde_json::json!({
|
response
|
||||||
"error": format!("Failed to cancel want: {}", e)
|
.data
|
||||||
})),
|
.as_ref()
|
||||||
)
|
.map(|w| &w.want_id)
|
||||||
.into_response(),
|
.unwrap_or(&"unknown".to_string())
|
||||||
Err(_) => (
|
);
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
Json(serde_json::json!({
|
}
|
||||||
"error": "Orchestrator did not respond"
|
Ok(Err(e)) => {
|
||||||
})),
|
tracing::error!("Failed to cancel want: {}", e);
|
||||||
)
|
(
|
||||||
.into_response(),
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new(format!("Failed to cancel want: {}", e))),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tracing::error!("Orchestrator did not respond to cancel want command");
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(ErrorResponse::new("Orchestrator did not respond")),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -260,11 +330,10 @@ async fn list_partitions(
|
||||||
let events = match state.bel_storage.list_events(0, 100000) {
|
let events = match state.bel_storage.list_events(0, 100000) {
|
||||||
Ok(events) => events,
|
Ok(events) => events,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to read events from BEL storage: {}", e);
|
||||||
return (
|
return (
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
Json(serde_json::json!({
|
Json(ErrorResponse::new(format!("Failed to read events: {}", e))),
|
||||||
"error": format!("Failed to read events: {}", e)
|
|
||||||
})),
|
|
||||||
)
|
)
|
||||||
.into_response();
|
.into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -284,11 +353,10 @@ async fn list_job_runs(
|
||||||
let events = match state.bel_storage.list_events(0, 100000) {
|
let events = match state.bel_storage.list_events(0, 100000) {
|
||||||
Ok(events) => events,
|
Ok(events) => events,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to read events from BEL storage: {}", e);
|
||||||
return (
|
return (
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
Json(serde_json::json!({
|
Json(ErrorResponse::new(format!("Failed to read events: {}", e))),
|
||||||
"error": format!("Failed to read events: {}", e)
|
|
||||||
})),
|
|
||||||
)
|
)
|
||||||
.into_response();
|
.into_response();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue