diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 9bfe67424c..ffe8ad2527 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -198,6 +198,10 @@ fn main() -> Result<(), Box> { tonic_build::configure() .type_attribute(".", "#[derive(Serialize, Deserialize)]") .type_attribute("StatusCode", r#"#[serde(rename_all = "snake_case")]"#) + .type_attribute( + "ExportLogsServiceResponse", + r#"#[derive(utoipa::ToSchema)]"#, + ) .out_dir("src/codegen/opentelemetry") .compile_with_config(prost_config, &protos, &["protos/third-party"])?; Ok(()) diff --git a/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs index 1f85018327..68fd0e1d32 100644 --- a/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs +++ b/quickwit/quickwit-proto/src/codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs @@ -13,6 +13,7 @@ pub struct ExportLogsServiceRequest { >, } #[derive(Serialize, Deserialize)] +#[derive(utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExportLogsServiceResponse { diff --git a/quickwit/quickwit-serve/src/cluster_api/rest_handler.rs b/quickwit/quickwit-serve/src/cluster_api/rest_handler.rs index c3c5800610..ddd8f4ffa0 100644 --- a/quickwit/quickwit-serve/src/cluster_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/cluster_api/rest_handler.rs @@ -23,6 +23,7 @@ use quickwit_cluster::{Cluster, ClusterSnapshot, NodeIdSchema}; use warp::{Filter, Rejection}; use crate::format::extract_format_from_qs; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; #[derive(utoipa::OpenApi)] @@ -43,6 +44,7 @@ pub fn cluster_handler( .then(get_cluster) .and(extract_format_from_qs()) .map(into_rest_api_response) + .recover(recover_fn) } #[utoipa::path( diff --git a/quickwit/quickwit-serve/src/delete_task_api/handler.rs b/quickwit/quickwit-serve/src/delete_task_api/handler.rs index 9542b2807e..755549d0c0 100644 --- a/quickwit/quickwit-serve/src/delete_task_api/handler.rs +++ b/quickwit/quickwit-serve/src/delete_task_api/handler.rs @@ -31,6 +31,7 @@ use serde::Deserialize; use warp::{Filter, Rejection}; use crate::format::extract_format_from_qs; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; use crate::with_arg; @@ -61,7 +62,9 @@ pub struct DeleteQueryRequest { pub fn delete_task_api_handlers( metastore: MetastoreServiceClient, ) -> impl Filter + Clone { - get_delete_tasks_handler(metastore.clone()).or(post_delete_tasks_handler(metastore.clone())) + get_delete_tasks_handler(metastore.clone()) + .or(post_delete_tasks_handler(metastore.clone())) + .recover(recover_fn) } pub fn get_delete_tasks_handler( diff --git a/quickwit/quickwit-serve/src/developer_api/log_level.rs b/quickwit/quickwit-serve/src/developer_api/log_level.rs index 316e78d8f8..cb52289156 100644 --- a/quickwit/quickwit-serve/src/developer_api/log_level.rs +++ b/quickwit/quickwit-serve/src/developer_api/log_level.rs @@ -29,7 +29,8 @@ struct EnvFilter { filter: String, } -#[utoipa::path(get, tag = "Log level", path = "/log-level")] +/// Dynamically Quickwit's log level +#[utoipa::path(get, tag = "Debug", path = "/log-level")] pub fn log_level_handler( env_filter_reload_fn: EnvFilterReloadFn, ) -> impl warp::Filter + Clone { diff --git a/quickwit/quickwit-serve/src/developer_api/mod.rs b/quickwit/quickwit-serve/src/developer_api/mod.rs index 491f57fe66..0a4f8a2944 100644 --- a/quickwit/quickwit-serve/src/developer_api/mod.rs +++ b/quickwit/quickwit-serve/src/developer_api/mod.rs @@ -29,6 +29,7 @@ use quickwit_cluster::Cluster; pub(crate) use server::DeveloperApiServer; use warp::{Filter, Rejection}; +use crate::rest::recover_fn; use crate::EnvFilterReloadFn; #[derive(utoipa::OpenApi)] @@ -39,9 +40,11 @@ pub(crate) fn developer_api_routes( cluster: Cluster, env_filter_reload_fn: EnvFilterReloadFn, ) -> impl Filter + Clone { - warp::path!("api" / "developer" / ..).and( - debug_handler(cluster.clone()) - .or(log_level_handler(env_filter_reload_fn.clone())) - .or(pprof_handlers()), - ) + warp::path!("api" / "developer" / ..) + .and( + debug_handler(cluster.clone()) + .or(log_level_handler(env_filter_reload_fn.clone())) + .or(pprof_handlers()), + ) + .recover(recover_fn) } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index ab3047324f..24df36ac3e 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -35,6 +35,7 @@ use crate::elasticsearch_api::make_elastic_api_response; use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError}; use crate::format::extract_format_from_qs; use crate::ingest_api::lines; +use crate::rest::recover_fn; use crate::{with_arg, Body}; /// POST `_elastic/_bulk` @@ -50,6 +51,7 @@ pub fn es_compat_bulk_handler( }) .and(extract_format_from_qs()) .map(make_elastic_api_response) + .recover(recover_fn) } /// POST `_elastic//_bulk` @@ -73,6 +75,7 @@ pub fn es_compat_index_bulk_handler( ) .and(extract_format_from_qs()) .map(make_elastic_api_response) + .recover(recover_fn) } async fn elastic_ingest_bulk( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index b2ef41b9a4..518bb6519b 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -177,8 +177,8 @@ pub(crate) fn elastic_index_count_filter( pub(crate) fn elastic_delete_index_filter( ) -> impl Filter, DeleteQueryParams), Error = Rejection> + Clone { warp::path!("_elastic" / String) - .and_then(extract_index_id_patterns) .and(warp::delete()) + .and_then(extract_index_id_patterns) .and(serde_qs::warp::query(serde_qs::Config::default())) } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 1419d54d2b..4acf8f2e38 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -45,6 +45,7 @@ use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; use crate::elasticsearch_api::model::ElasticsearchError; +use crate::rest::recover_fn; use crate::rest_api_response::RestApiResponse; use crate::{BodyFormat, BuildInfo}; @@ -79,6 +80,7 @@ pub fn elastic_api_handlers( .or(es_compat_stats_handler(metastore.clone())) .or(es_compat_index_cat_indices_handler(metastore.clone())) .or(es_compat_cat_indices_handler(metastore.clone())) + .recover(recover_fn) // Register newly created handlers here. } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index ed17189672..1c5402d06f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -62,6 +62,7 @@ use super::model::{ }; use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; +use crate::rest::recover_fn; use crate::rest_api_response::{RestApiError, RestApiResponse}; use crate::{with_arg, BuildInfo}; @@ -93,20 +94,22 @@ pub fn es_compat_cluster_info_handler( pub fn es_compat_search_handler( _search_service: Arc, ) -> impl Filter + Clone { - elasticsearch_filter().then(|_params: SearchQueryParams| async move { - // TODO - let api_error = RestApiError { - status_code: StatusCode::NOT_IMPLEMENTED, - message: "_elastic/_search is not supported yet. Please try the index search endpoint \ - (_elastic/{index}/search)" - .to_string(), - }; - RestApiResponse::new::<(), _>( - &Err(api_error), - StatusCode::NOT_IMPLEMENTED, - BodyFormat::default(), - ) - }) + elasticsearch_filter() + .then(|_params: SearchQueryParams| async move { + // TODO + let api_error = RestApiError { + status_code: StatusCode::NOT_IMPLEMENTED, + message: "_elastic/_search is not supported yet. Please try the index search \ + endpoint (_elastic/{index}/search)" + .to_string(), + }; + RestApiResponse::new::<(), _>( + &Err(api_error), + StatusCode::NOT_IMPLEMENTED, + BodyFormat::default(), + ) + }) + .recover(recover_fn) } /// GET or POST _elastic/{index}/_field_caps @@ -119,6 +122,7 @@ pub fn es_compat_index_field_capabilities_handler( .and(with_arg(search_service)) .then(es_compat_index_field_capabilities) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// DELETE _elastic/{index} @@ -139,6 +143,7 @@ pub fn es_compat_stats_handler( .and(with_arg(search_service)) .then(es_compat_stats) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// GET _elastic/{index}/_stats @@ -149,6 +154,7 @@ pub fn es_compat_index_stats_handler( .and(with_arg(search_service)) .then(es_compat_index_stats) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// GET _elastic/_cat/indices @@ -159,6 +165,7 @@ pub fn es_compat_cat_indices_handler( .and(with_arg(search_service)) .then(es_compat_cat_indices) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// GET _elastic/_cat/indices/{index} @@ -169,6 +176,7 @@ pub fn es_compat_index_cat_indices_handler( .and(with_arg(search_service)) .then(es_compat_index_cat_indices) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// GET or POST _elastic/{index}/_search @@ -179,6 +187,7 @@ pub fn es_compat_index_search_handler( .and(with_arg(search_service)) .then(es_compat_index_search) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } /// GET or POST _elastic/{index}/_count @@ -189,9 +198,10 @@ pub fn es_compat_index_count_handler( .and(with_arg(search_service)) .then(es_compat_index_count) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } -/// POST _elastic/_search +/// POST _elastic/_msearch pub fn es_compat_index_multi_search_handler( search_service: Arc, ) -> impl Filter + Clone { @@ -205,6 +215,7 @@ pub fn es_compat_index_multi_search_handler( }; RestApiResponse::new(&result, status_code, BodyFormat::default()) }) + .recover(recover_fn) } /// GET or POST _elastic/_search/scroll @@ -215,6 +226,7 @@ pub fn es_compat_scroll_handler( .and(with_arg(search_service)) .then(es_scroll) .map(|result| make_elastic_api_response(result, BodyFormat::default())) + .recover(recover_fn) } fn build_request_for_es_api( diff --git a/quickwit/quickwit-serve/src/health_check_api/handler.rs b/quickwit/quickwit-serve/src/health_check_api/handler.rs index b8032dc3b2..15ac5098ef 100644 --- a/quickwit/quickwit-serve/src/health_check_api/handler.rs +++ b/quickwit/quickwit-serve/src/health_check_api/handler.rs @@ -26,6 +26,7 @@ use warp::hyper::StatusCode; use warp::reply::with_status; use warp::{Filter, Rejection}; +use crate::rest::recover_fn; use crate::with_arg; #[derive(utoipa::OpenApi)] @@ -50,6 +51,7 @@ fn liveness_handler( .and(with_arg(indexer_service_opt)) .and(with_arg(janitor_service_opt)) .then(get_liveness) + .recover(recover_fn) } fn readiness_handler( @@ -59,6 +61,7 @@ fn readiness_handler( .and(warp::get()) .and(with_arg(cluster)) .then(get_readiness) + .recover(recover_fn) } #[utoipa::path( diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index f5a783901d..12e2f22b36 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -47,6 +47,7 @@ use tracing::{info, warn}; use warp::{Filter, Rejection}; use crate::format::{extract_config_format, extract_format_from_qs}; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; use crate::simple_list::{from_simple_list, to_simple_list}; use crate::with_arg; @@ -107,6 +108,7 @@ pub fn index_management_handlers( .or(analyze_request_handler()) // Parse query into query AST handler. .or(parse_query_request_handler()) + .recover(recover_fn) } fn json_body( diff --git a/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs b/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs index 5ac1eaaf52..5f2bf9cf3a 100644 --- a/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/indexing_api/rest_handler.rs @@ -25,6 +25,7 @@ use warp::{Filter, Rejection}; use crate::format::extract_format_from_qs; use crate::require; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; #[derive(utoipa::OpenApi)] @@ -60,4 +61,5 @@ pub fn indexing_get_handler( .then(indexing_endpoint) .and(extract_format_from_qs()) .map(into_rest_api_response) + .recover(recover_fn) } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 238492d6b1..c50b675ded 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -39,6 +39,7 @@ use crate::jaeger_api::model::{ JaegerError, JaegerResponseBody, JaegerSpan, JaegerTrace, TracesSearchQueryParams, DEFAULT_NUMBER_OF_TRACES, }; +use crate::rest::recover_fn; use crate::rest_api_response::RestApiResponse; use crate::search_api::extract_index_id_patterns; use crate::{require, BodyFormat}; @@ -66,6 +67,7 @@ pub(crate) fn jaeger_api_handlers( )) .or(jaeger_traces_search_handler(jaeger_service_opt.clone())) .or(jaeger_traces_handler(jaeger_service_opt.clone())) + .recover(recover_fn) } fn jaeger_api_path_filter() -> impl Filter,), Error = Rejection> + Clone { @@ -349,13 +351,13 @@ mod tests { #[tokio::test] async fn test_when_jaeger_not_found() { - let jaeger_api_handler = jaeger_api_handlers(None).recover(recover_fn); + let jaeger_api_handler = jaeger_api_handlers(None).recover(crate::rest::recover_fn_final); let resp = warp::test::request() .path("/otel-traces-v0_7/jaeger/api/services") .reply(&jaeger_api_handler) .await; - let error_body = serde_json::from_slice::>(resp.body()).unwrap(); assert_eq!(resp.status(), 404); + let error_body = serde_json::from_slice::>(resp.body()).unwrap(); assert!(error_body.contains_key("message")); assert_eq!(error_body.get("message").unwrap(), "Route not found"); } diff --git a/quickwit/quickwit-serve/src/node_info_handler.rs b/quickwit/quickwit-serve/src/node_info_handler.rs index 9d170c34ff..b1791be9d6 100644 --- a/quickwit/quickwit-serve/src/node_info_handler.rs +++ b/quickwit/quickwit-serve/src/node_info_handler.rs @@ -23,6 +23,7 @@ use quickwit_config::NodeConfig; use serde_json::json; use warp::{Filter, Rejection}; +use crate::rest::recover_fn; use crate::{with_arg, BuildInfo, RuntimeInfo}; #[derive(utoipa::OpenApi)] @@ -34,7 +35,9 @@ pub fn node_info_handler( runtime_info: &'static RuntimeInfo, config: Arc, ) -> impl Filter + Clone { - node_version_handler(build_info, runtime_info).or(node_config_handler(config)) + node_version_handler(build_info, runtime_info) + .or(node_config_handler(config)) + .recover(recover_fn) } #[utoipa::path(get, tag = "Node Info", path = "/version")] diff --git a/quickwit/quickwit-serve/src/openapi.rs b/quickwit/quickwit-serve/src/openapi.rs index a54b1fcd85..ec0c96b619 100644 --- a/quickwit/quickwit-serve/src/openapi.rs +++ b/quickwit/quickwit-serve/src/openapi.rs @@ -38,6 +38,7 @@ use crate::ingest_api::{IngestApi, IngestApiSchemas}; use crate::jaeger_api::JaegerApi; use crate::metrics_api::MetricsApi; use crate::node_info_handler::NodeInfoApi; +use crate::otlp_api::OtlpApi; use crate::search_api::SearchApi; use crate::template_api::IndexTemplateApi; @@ -78,7 +79,8 @@ pub fn build_docs() -> utoipa::openapi::OpenApi { Tag::new("Indexing"), Tag::new("Splits"), Tag::new("Jaeger"), - Tag::new("Debugging"), + Tag::new("Open Telemetry"), + Tag::new("Debug"), ]; docs_base.tags = Some(tags); @@ -89,6 +91,7 @@ pub fn build_docs() -> utoipa::openapi::OpenApi { .merge_components_and_paths(DeveloperApi::openapi().with_path_prefix("/api/developer")); docs_base .merge_components_and_paths(ElasticCompatibleApi::openapi().with_path_prefix("/api/v1")); + docs_base.merge_components_and_paths(OtlpApi::openapi().with_path_prefix("/api/v1")); docs_base.merge_components_and_paths(HealthCheckApi::openapi().with_path_prefix("/health")); docs_base.merge_components_and_paths(IndexApi::openapi().with_path_prefix("/api/v1")); docs_base.merge_components_and_paths(IndexingApi::openapi().with_path_prefix("/api/v1")); diff --git a/quickwit/quickwit-serve/src/otlp_api/mod.rs b/quickwit/quickwit-serve/src/otlp_api/mod.rs index 6a73fb17ed..df063dfa9e 100644 --- a/quickwit/quickwit-serve/src/otlp_api/mod.rs +++ b/quickwit/quickwit-serve/src/otlp_api/mod.rs @@ -19,3 +19,4 @@ mod rest_handler; pub(crate) use rest_handler::otlp_ingest_api_handlers; +pub use rest_handler::OtlpApi; diff --git a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs index 0334a8b478..0b61f835e4 100644 --- a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs @@ -35,20 +35,35 @@ use serde::{self, Serialize}; use tracing::error; use warp::{Filter, Rejection}; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; use crate::{require, with_arg, BodyFormat}; +#[derive(utoipa::OpenApi)] +#[openapi(paths(otlp_default_logs_handler, otlp_default_traces_handler))] +pub struct OtlpApi; + /// Setup OpenTelemetry API handlers. pub(crate) fn otlp_ingest_api_handlers( otlp_logs_service: Option, otlp_traces_service: Option, ) -> impl Filter + Clone { otlp_default_logs_handler(otlp_logs_service.clone()) - .or(otlp_default_traces_handler(otlp_traces_service.clone())) - .or(otlp_logs_handler(otlp_logs_service)) - .or(otlp_ingest_traces_handler(otlp_traces_service)) + .or(otlp_default_traces_handler(otlp_traces_service.clone()).recover(recover_fn)) + .or(otlp_logs_handler(otlp_logs_service).recover(recover_fn)) + .or(otlp_ingest_traces_handler(otlp_traces_service).recover(recover_fn)) } +/// Open Telemetry REST/Protobuf logs ingest endpoint. +#[utoipa::path( + post, + tag = "Open Telemetry", + path = "/otlp/v1/logs", + request_body(content = String, description = "`ExportLogsServiceRequest` protobuf message", content_type = "application/x-protobuf"), + responses( + (status = 200, description = "Successfully exported logs.", body = ExportLogsServiceResponse) + ), +)] pub(crate) fn otlp_default_logs_handler( otlp_logs_service: Option, ) -> impl Filter + Clone { @@ -83,6 +98,16 @@ pub(crate) fn otlp_logs_handler( .map(into_rest_api_response) } +/// Open Telemetry REST/Protobuf traces ingest endpoint. +#[utoipa::path( + post, + tag = "Open Telemetry", + path = "/otlp/v1/traces", + request_body(content = String, description = "`ExportTraceServiceRequest` protobuf message", content_type = "application/x-protobuf"), + responses( + (status = 200, description = "Successfully exported traces.", body = ExportTracesServiceResponse) + ), +)] pub(crate) fn otlp_default_traces_handler( otlp_traces_service: Option, ) -> impl Filter + Clone { diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 8945faf783..e2818b3592 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use hyper::http::HeaderValue; use hyper::{http, Method, StatusCode}; use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_search::SearchService; use tower::make::Shared; use tower::ServiceBuilder; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; @@ -140,7 +141,8 @@ pub(crate) async fn start_rest_server( // Docs routes let api_doc = warp::path("openapi.json") .and(warp::get()) - .map(|| warp::reply::json(&crate::openapi::build_docs())); + .map(|| warp::reply::json(&crate::openapi::build_docs())) + .recover(recover_fn); // `/health/*` routes. let health_check_routes = health_check_handlers( @@ -150,7 +152,10 @@ pub(crate) async fn start_rest_server( ); // `/metrics` route. - let metrics_routes = warp::path("metrics").and(warp::get()).map(metrics_handler); + let metrics_routes = warp::path("metrics") + .and(warp::get()) + .map(metrics_handler) + .recover(recover_fn); // `/api/developer/*` route. let developer_routes = developer_api_routes( @@ -162,7 +167,8 @@ pub(crate) async fn start_rest_server( let redirect_root_to_ui_route = warp::path::end() .and(warp::get()) - .map(|| redirect(http::Uri::from_static("/ui/search"))); + .map(|| redirect(http::Uri::from_static("/ui/search"))) + .recover(recover_fn); let extra_headers = warp::reply::with::headers( quickwit_services @@ -181,7 +187,7 @@ pub(crate) async fn start_rest_server( .or(metrics_routes) .or(developer_routes) .with(request_counter) - .recover(recover_fn) + .recover(recover_fn_final) .with(extra_headers) .boxed(); @@ -223,57 +229,60 @@ pub(crate) async fn start_rest_server( Ok(()) } +fn search_routes( + search_service: Arc, +) -> impl Filter + Clone { + search_get_handler(search_service.clone()) + .or(search_post_handler(search_service.clone())) + .or(search_stream_handler(search_service)) + .recover(recover_fn) +} + fn api_v1_routes( quickwit_services: Arc, ) -> impl Filter + Clone { let api_v1_root_url = warp::path!("api" / "v1" / ..); api_v1_root_url.and( - cluster_handler(quickwit_services.cluster.clone()) - .or(node_info_handler( - BuildInfo::get(), - RuntimeInfo::get(), - quickwit_services.node_config.clone(), - )) - .or(indexing_get_handler( - quickwit_services.indexing_service_opt.clone(), - )) - .or(search_get_handler(quickwit_services.search_service.clone())) - .or(search_post_handler( - quickwit_services.search_service.clone(), - )) - .or(search_stream_handler( - quickwit_services.search_service.clone(), - )) - .or(ingest_api_handlers( - quickwit_services.ingest_router_service.clone(), - quickwit_services.ingest_service.clone(), - quickwit_services.node_config.ingest_api_config.clone(), - )) - .or(otlp_ingest_api_handlers( - quickwit_services.otlp_logs_service_opt.clone(), - quickwit_services.otlp_traces_service_opt.clone(), - )) - .or(index_management_handlers( - quickwit_services.index_manager.clone(), - quickwit_services.node_config.clone(), - )) - .or(delete_task_api_handlers( - quickwit_services.metastore_client.clone(), - )) - .or(jaeger_api_handlers( - quickwit_services.jaeger_service_opt.clone(), - )) - .or(elastic_api_handlers( - quickwit_services.node_config.clone(), - quickwit_services.search_service.clone(), - quickwit_services.ingest_service.clone(), - quickwit_services.ingest_router_service.clone(), - quickwit_services.metastore_client.clone(), - quickwit_services.index_manager.clone(), - )) - .or(index_template_api_handlers( - quickwit_services.metastore_client.clone(), - )), + elastic_api_handlers( + quickwit_services.node_config.clone(), + quickwit_services.search_service.clone(), + quickwit_services.ingest_service.clone(), + quickwit_services.ingest_router_service.clone(), + quickwit_services.metastore_client.clone(), + quickwit_services.index_manager.clone(), + ) + .or(cluster_handler(quickwit_services.cluster.clone())) + .or(node_info_handler( + BuildInfo::get(), + RuntimeInfo::get(), + quickwit_services.node_config.clone(), + )) + .or(indexing_get_handler( + quickwit_services.indexing_service_opt.clone(), + )) + .or(search_routes(quickwit_services.search_service.clone())) + .or(ingest_api_handlers( + quickwit_services.ingest_router_service.clone(), + quickwit_services.ingest_service.clone(), + quickwit_services.node_config.ingest_api_config.clone(), + )) + .or(otlp_ingest_api_handlers( + quickwit_services.otlp_logs_service_opt.clone(), + quickwit_services.otlp_traces_service_opt.clone(), + )) + .or(index_management_handlers( + quickwit_services.index_manager.clone(), + quickwit_services.node_config.clone(), + )) + .or(delete_task_api_handlers( + quickwit_services.metastore_client.clone(), + )) + .or(jaeger_api_handlers( + quickwit_services.jaeger_service_opt.clone(), + )) + .or(index_template_api_handlers( + quickwit_services.metastore_client.clone(), + )), ) } @@ -289,7 +298,30 @@ fn api_v1_routes( // More on this here: https://github.com/seanmonstar/warp/issues/388. // We may use this work on the PR is merged: https://github.com/seanmonstar/warp/pull/909. pub async fn recover_fn(rejection: Rejection) -> Result { - let error = get_status_with_error(rejection); + let error = get_status_with_error(rejection)?; + let status_code = error.status_code; + Ok(RestApiResponse::new::<(), _>( + &Err(error), + status_code, + BodyFormat::default(), + )) +} + +pub async fn recover_fn_final(rejection: Rejection) -> Result { + let error = get_status_with_error(rejection).unwrap_or_else(|rejection: Rejection| { + if rejection.is_not_found() { + RestApiError { + status_code: StatusCode::NOT_FOUND, + message: "Route not found".to_string(), + } + } else { + error!("REST server error: {:?}", rejection); + RestApiError { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + message: "internal server error".to_string(), + } + } + }); let status_code = error.status_code; Ok(RestApiResponse::new::<(), _>( &Err(error), @@ -298,96 +330,87 @@ pub async fn recover_fn(rejection: Rejection) -> Result { )) } -fn get_status_with_error(rejection: Rejection) -> RestApiError { +fn get_status_with_error(rejection: Rejection) -> Result { if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { // Happens when the request body could not be deserialized correctly. - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.0.to_string(), - } + }) } else if let Some(error) = rejection.find::() { // Happens when the request body could not be deserialized correctly. - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::LENGTH_REQUIRED, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::PAYLOAD_TOO_LARGE, message: error.to_string(), - } + }) } else if let Some(err) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::TOO_MANY_REQUESTS, message: err.to_string(), - } + }) } else if let Some(error) = rejection.find::() { // Happens when the url path or request body contains invalid argument(s). - RestApiError { + Ok(RestApiError { status_code: StatusCode::BAD_REQUEST, message: error.0.to_string(), - } + }) } else if let Some(error) = rejection.find::() { - RestApiError { + Ok(RestApiError { status_code: StatusCode::METHOD_NOT_ALLOWED, message: error.to_string(), - } - } else if rejection.is_not_found() { - RestApiError { - status_code: StatusCode::NOT_FOUND, - message: "Route not found".to_string(), - } + }) } else { - error!("REST server error: {:?}", rejection); - RestApiError { - status_code: StatusCode::INTERNAL_SERVER_ERROR, - message: "internal server error".to_string(), - } + Err(rejection) } } @@ -438,6 +461,7 @@ mod tests { use tower::Service; use super::*; + use crate::rest::recover_fn_final; pub(crate) fn ingest_service_client() -> IngestServiceClient { let universe = quickwit_actors::Universe::new(); @@ -700,7 +724,7 @@ mod tests { }; let handler = api_v1_routes(Arc::new(quickwit_services)) - .recover(recover_fn) + .recover(recover_fn_final) .with(warp::reply::with::headers( node_config.rest_config.extra_headers.clone(), )); diff --git a/quickwit/quickwit-serve/src/template_api/rest_handler.rs b/quickwit/quickwit-serve/src/template_api/rest_handler.rs index 2600939b29..8b4c76cba5 100644 --- a/quickwit/quickwit-serve/src/template_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/template_api/rest_handler.rs @@ -31,6 +31,7 @@ use warp::reject::Rejection; use warp::{Filter, Reply}; use crate::format::{extract_config_format, extract_format_from_qs}; +use crate::rest::recover_fn; use crate::rest_api_response::into_rest_api_response; use crate::with_arg; @@ -55,6 +56,7 @@ pub(crate) fn index_template_api_handlers( .or(update_index_template_handler(metastore.clone())) .or(delete_index_template_handler(metastore.clone())) .or(list_index_templates_handler(metastore.clone())) + .recover(recover_fn) } fn create_index_template_handler( diff --git a/quickwit/quickwit-serve/src/ui_handler.rs b/quickwit/quickwit-serve/src/ui_handler.rs index bcd253f5a1..2c78712381 100644 --- a/quickwit/quickwit-serve/src/ui_handler.rs +++ b/quickwit/quickwit-serve/src/ui_handler.rs @@ -26,6 +26,8 @@ use warp::path::Tail; use warp::reply::Response; use warp::{Filter, Rejection}; +use crate::rest::recover_fn; + /// Regular expression to identify which path should serve an asset file. /// If not matched, the server serves the `index.html` file. const PATH_PATTERN: &str = r"(^static|\.(png|json|txt|ico|js|map)$)"; @@ -40,6 +42,7 @@ pub fn ui_handler() -> impl Filter Result { diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml index 4a8101d9f7..f2d92991e9 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml @@ -90,5 +90,3 @@ expected: count: 0 docs: count: 0 - -