Skip to content

Commit

Permalink
Adds recover_fn to many routes. (#5006)
Browse files Browse the repository at this point in the history
* Adds recover_fn to many routes.

If a route matches, we want to run its handler, and upon failure,
the last thing we want is for another overlapping route to be executed
as a fallback.

To avoid this, we call recover on each individual routes that have no
overlap with routes of lesser priority.

* CR comments

* Added recover earlier in the tree.

* Reorganizing openapi doc and adding otel endpoint to OTEL REST endpoints
  • Loading branch information
fulmicoton committed Jun 11, 2024
1 parent 66b025f commit 5df205d
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 125 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.type_attribute(".", "#[derive(Serialize, Deserialize)]")
.type_attribute("StatusCode", r#"#[serde(rename_all = "snake_case")]"#)
.type_attribute(
"ExportLogsServiceResponse",
r#"#[derive(utoipa::ToSchema)]"#,
)
.out_dir("src/codegen/opentelemetry")
.compile_with_config(prost_config, &protos, &["protos/third-party"])?;
Ok(())
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/cluster_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use quickwit_cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
use warp::{Filter, Rejection};

use crate::format::extract_format_from_qs;
use crate::rest::recover_fn;
use crate::rest_api_response::into_rest_api_response;

#[derive(utoipa::OpenApi)]
Expand All @@ -43,6 +44,7 @@ pub fn cluster_handler(
.then(get_cluster)
.and(extract_format_from_qs())
.map(into_rest_api_response)
.recover(recover_fn)
}

#[utoipa::path(
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/delete_task_api/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use serde::Deserialize;
use warp::{Filter, Rejection};

use crate::format::extract_format_from_qs;
use crate::rest::recover_fn;
use crate::rest_api_response::into_rest_api_response;
use crate::with_arg;

Expand Down Expand Up @@ -61,7 +62,9 @@ pub struct DeleteQueryRequest {
pub fn delete_task_api_handlers(
metastore: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
get_delete_tasks_handler(metastore.clone()).or(post_delete_tasks_handler(metastore.clone()))
get_delete_tasks_handler(metastore.clone())
.or(post_delete_tasks_handler(metastore.clone()))
.recover(recover_fn)
}

pub fn get_delete_tasks_handler(
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-serve/src/developer_api/log_level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ struct EnvFilter {
filter: String,
}

#[utoipa::path(get, tag = "Log level", path = "/log-level")]
/// Dynamically Quickwit's log level
#[utoipa::path(get, tag = "Debug", path = "/log-level")]
pub fn log_level_handler(
env_filter_reload_fn: EnvFilterReloadFn,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down
13 changes: 8 additions & 5 deletions quickwit/quickwit-serve/src/developer_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use quickwit_cluster::Cluster;
pub(crate) use server::DeveloperApiServer;
use warp::{Filter, Rejection};

use crate::rest::recover_fn;
use crate::EnvFilterReloadFn;

#[derive(utoipa::OpenApi)]
Expand All @@ -39,9 +40,11 @@ pub(crate) fn developer_api_routes(
cluster: Cluster,
env_filter_reload_fn: EnvFilterReloadFn,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
warp::path!("api" / "developer" / ..).and(
debug_handler(cluster.clone())
.or(log_level_handler(env_filter_reload_fn.clone()))
.or(pprof_handlers()),
)
warp::path!("api" / "developer" / ..)
.and(
debug_handler(cluster.clone())
.or(log_level_handler(env_filter_reload_fn.clone()))
.or(pprof_handlers()),
)
.recover(recover_fn)
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::elasticsearch_api::make_elastic_api_response;
use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError};
use crate::format::extract_format_from_qs;
use crate::ingest_api::lines;
use crate::rest::recover_fn;
use crate::{with_arg, Body};

/// POST `_elastic/_bulk`
Expand All @@ -50,6 +51,7 @@ pub fn es_compat_bulk_handler(
})
.and(extract_format_from_qs())
.map(make_elastic_api_response)
.recover(recover_fn)
}

/// POST `_elastic/<index>/_bulk`
Expand All @@ -73,6 +75,7 @@ pub fn es_compat_index_bulk_handler(
)
.and(extract_format_from_qs())
.map(make_elastic_api_response)
.recover(recover_fn)
}

async fn elastic_ingest_bulk(
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ pub(crate) fn elastic_index_count_filter(
pub(crate) fn elastic_delete_index_filter(
) -> impl Filter<Extract = (Vec<String>, DeleteQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / String)
.and_then(extract_index_id_patterns)
.and(warp::delete())
.and_then(extract_index_id_patterns)
.and(serde_qs::warp::query(serde_qs::Config::default()))
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use crate::elasticsearch_api::model::ElasticsearchError;
use crate::rest::recover_fn;
use crate::rest_api_response::RestApiResponse;
use crate::{BodyFormat, BuildInfo};

Expand Down Expand Up @@ -79,6 +80,7 @@ pub fn elastic_api_handlers(
.or(es_compat_stats_handler(metastore.clone()))
.or(es_compat_index_cat_indices_handler(metastore.clone()))
.or(es_compat_cat_indices_handler(metastore.clone()))
.recover(recover_fn)
// Register newly created handlers here.
}

Expand Down
42 changes: 27 additions & 15 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use super::model::{
};
use super::{make_elastic_api_response, TrackTotalHits};
use crate::format::BodyFormat;
use crate::rest::recover_fn;
use crate::rest_api_response::{RestApiError, RestApiResponse};
use crate::{with_arg, BuildInfo};

Expand Down Expand Up @@ -93,20 +94,22 @@ pub fn es_compat_cluster_info_handler(
pub fn es_compat_search_handler(
_search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elasticsearch_filter().then(|_params: SearchQueryParams| async move {
// TODO
let api_error = RestApiError {
status_code: StatusCode::NOT_IMPLEMENTED,
message: "_elastic/_search is not supported yet. Please try the index search endpoint \
(_elastic/{index}/search)"
.to_string(),
};
RestApiResponse::new::<(), _>(
&Err(api_error),
StatusCode::NOT_IMPLEMENTED,
BodyFormat::default(),
)
})
elasticsearch_filter()
.then(|_params: SearchQueryParams| async move {
// TODO
let api_error = RestApiError {
status_code: StatusCode::NOT_IMPLEMENTED,
message: "_elastic/_search is not supported yet. Please try the index search \
endpoint (_elastic/{index}/search)"
.to_string(),
};
RestApiResponse::new::<(), _>(
&Err(api_error),
StatusCode::NOT_IMPLEMENTED,
BodyFormat::default(),
)
})
.recover(recover_fn)
}

/// GET or POST _elastic/{index}/_field_caps
Expand All @@ -119,6 +122,7 @@ pub fn es_compat_index_field_capabilities_handler(
.and(with_arg(search_service))
.then(es_compat_index_field_capabilities)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// DELETE _elastic/{index}
Expand All @@ -139,6 +143,7 @@ pub fn es_compat_stats_handler(
.and(with_arg(search_service))
.then(es_compat_stats)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// GET _elastic/{index}/_stats
Expand All @@ -149,6 +154,7 @@ pub fn es_compat_index_stats_handler(
.and(with_arg(search_service))
.then(es_compat_index_stats)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// GET _elastic/_cat/indices
Expand All @@ -159,6 +165,7 @@ pub fn es_compat_cat_indices_handler(
.and(with_arg(search_service))
.then(es_compat_cat_indices)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// GET _elastic/_cat/indices/{index}
Expand All @@ -169,6 +176,7 @@ pub fn es_compat_index_cat_indices_handler(
.and(with_arg(search_service))
.then(es_compat_index_cat_indices)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// GET or POST _elastic/{index}/_search
Expand All @@ -179,6 +187,7 @@ pub fn es_compat_index_search_handler(
.and(with_arg(search_service))
.then(es_compat_index_search)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// GET or POST _elastic/{index}/_count
Expand All @@ -189,9 +198,10 @@ pub fn es_compat_index_count_handler(
.and(with_arg(search_service))
.then(es_compat_index_count)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

/// POST _elastic/_search
/// POST _elastic/_msearch
pub fn es_compat_index_multi_search_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand All @@ -205,6 +215,7 @@ pub fn es_compat_index_multi_search_handler(
};
RestApiResponse::new(&result, status_code, BodyFormat::default())
})
.recover(recover_fn)
}

/// GET or POST _elastic/_search/scroll
Expand All @@ -215,6 +226,7 @@ pub fn es_compat_scroll_handler(
.and(with_arg(search_service))
.then(es_scroll)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
.recover(recover_fn)
}

fn build_request_for_es_api(
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-serve/src/health_check_api/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use warp::hyper::StatusCode;
use warp::reply::with_status;
use warp::{Filter, Rejection};

use crate::rest::recover_fn;
use crate::with_arg;

#[derive(utoipa::OpenApi)]
Expand All @@ -50,6 +51,7 @@ fn liveness_handler(
.and(with_arg(indexer_service_opt))
.and(with_arg(janitor_service_opt))
.then(get_liveness)
.recover(recover_fn)
}

fn readiness_handler(
Expand All @@ -59,6 +61,7 @@ fn readiness_handler(
.and(warp::get())
.and(with_arg(cluster))
.then(get_readiness)
.recover(recover_fn)
}

#[utoipa::path(
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tracing::{info, warn};
use warp::{Filter, Rejection};

use crate::format::{extract_config_format, extract_format_from_qs};
use crate::rest::recover_fn;
use crate::rest_api_response::into_rest_api_response;
use crate::simple_list::{from_simple_list, to_simple_list};
use crate::with_arg;
Expand Down Expand Up @@ -107,6 +108,7 @@ pub fn index_management_handlers(
.or(analyze_request_handler())
// Parse query into query AST handler.
.or(parse_query_request_handler())
.recover(recover_fn)
}

fn json_body<T: DeserializeOwned + Send>(
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/indexing_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use warp::{Filter, Rejection};

use crate::format::extract_format_from_qs;
use crate::require;
use crate::rest::recover_fn;
use crate::rest_api_response::into_rest_api_response;

#[derive(utoipa::OpenApi)]
Expand Down Expand Up @@ -60,4 +61,5 @@ pub fn indexing_get_handler(
.then(indexing_endpoint)
.and(extract_format_from_qs())
.map(into_rest_api_response)
.recover(recover_fn)
}
6 changes: 4 additions & 2 deletions quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::jaeger_api::model::{
JaegerError, JaegerResponseBody, JaegerSpan, JaegerTrace, TracesSearchQueryParams,
DEFAULT_NUMBER_OF_TRACES,
};
use crate::rest::recover_fn;
use crate::rest_api_response::RestApiResponse;
use crate::search_api::extract_index_id_patterns;
use crate::{require, BodyFormat};
Expand Down Expand Up @@ -66,6 +67,7 @@ pub(crate) fn jaeger_api_handlers(
))
.or(jaeger_traces_search_handler(jaeger_service_opt.clone()))
.or(jaeger_traces_handler(jaeger_service_opt.clone()))
.recover(recover_fn)
}

fn jaeger_api_path_filter() -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
Expand Down Expand Up @@ -349,13 +351,13 @@ mod tests {

#[tokio::test]
async fn test_when_jaeger_not_found() {
let jaeger_api_handler = jaeger_api_handlers(None).recover(recover_fn);
let jaeger_api_handler = jaeger_api_handlers(None).recover(crate::rest::recover_fn_final);
let resp = warp::test::request()
.path("/otel-traces-v0_7/jaeger/api/services")
.reply(&jaeger_api_handler)
.await;
let error_body = serde_json::from_slice::<HashMap<String, String>>(resp.body()).unwrap();
assert_eq!(resp.status(), 404);
let error_body = serde_json::from_slice::<HashMap<String, String>>(resp.body()).unwrap();
assert!(error_body.contains_key("message"));
assert_eq!(error_body.get("message").unwrap(), "Route not found");
}
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/node_info_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use quickwit_config::NodeConfig;
use serde_json::json;
use warp::{Filter, Rejection};

use crate::rest::recover_fn;
use crate::{with_arg, BuildInfo, RuntimeInfo};

#[derive(utoipa::OpenApi)]
Expand All @@ -34,7 +35,9 @@ pub fn node_info_handler(
runtime_info: &'static RuntimeInfo,
config: Arc<NodeConfig>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
node_version_handler(build_info, runtime_info).or(node_config_handler(config))
node_version_handler(build_info, runtime_info)
.or(node_config_handler(config))
.recover(recover_fn)
}

#[utoipa::path(get, tag = "Node Info", path = "/version")]
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::ingest_api::{IngestApi, IngestApiSchemas};
use crate::jaeger_api::JaegerApi;
use crate::metrics_api::MetricsApi;
use crate::node_info_handler::NodeInfoApi;
use crate::otlp_api::OtlpApi;
use crate::search_api::SearchApi;
use crate::template_api::IndexTemplateApi;

Expand Down Expand Up @@ -78,7 +79,8 @@ pub fn build_docs() -> utoipa::openapi::OpenApi {
Tag::new("Indexing"),
Tag::new("Splits"),
Tag::new("Jaeger"),
Tag::new("Debugging"),
Tag::new("Open Telemetry"),
Tag::new("Debug"),
];
docs_base.tags = Some(tags);

Expand All @@ -89,6 +91,7 @@ pub fn build_docs() -> utoipa::openapi::OpenApi {
.merge_components_and_paths(DeveloperApi::openapi().with_path_prefix("/api/developer"));
docs_base
.merge_components_and_paths(ElasticCompatibleApi::openapi().with_path_prefix("/api/v1"));
docs_base.merge_components_and_paths(OtlpApi::openapi().with_path_prefix("/api/v1"));
docs_base.merge_components_and_paths(HealthCheckApi::openapi().with_path_prefix("/health"));
docs_base.merge_components_and_paths(IndexApi::openapi().with_path_prefix("/api/v1"));
docs_base.merge_components_and_paths(IndexingApi::openapi().with_path_prefix("/api/v1"));
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/otlp_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

mod rest_handler;
pub(crate) use rest_handler::otlp_ingest_api_handlers;
pub use rest_handler::OtlpApi;
Loading

0 comments on commit 5df205d

Please sign in to comment.