diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2610fa1eb3..a5ec8517ca 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5913,7 +5913,6 @@ dependencies = [ "dyn-clone", "fail", "flume", - "fnv", "futures", "http 0.2.12", "hyper 0.14.28", diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index fcca994712..8ea6b81b2c 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -211,7 +211,7 @@ async fn aux_test_failpoints() -> anyhow::Result<()> { Ok(()) } -const TEST_TEXT: &'static str = r#"His sole child, my lord, and bequeathed to my +const TEST_TEXT: &str = r#"His sole child, my lord, and bequeathed to my overlooking. I have those hopes of her good that her education promises; her dispositions she inherits, which makes fair gifts fairer; for where diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 8e7ab7c246..b9126f5a15 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -16,9 +16,8 @@ async-trait = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } dyn-clone = { workspace = true } -fail = { workspace = true } +fail = { workspace = true, optional = true } flume = { workspace = true } -fnv = { workspace = true } futures = { workspace = true } http = { workspace = true } hyper = { workspace = true } @@ -59,4 +58,5 @@ quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-codegen = { workspace = true } [features] +failpoints = ["fail/failpoints"] testsuite = ["mockall"] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 89a058a765..09fd0fa7f9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -27,7 +27,6 @@ use std::time::{Duration, Instant}; use anyhow::Context; use async_trait::async_trait; use bytesize::ByteSize; -use fnv::FnvHashMap; use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::CreateQueueError; @@ -190,11 +189,15 @@ impl Ingester { }; match mrecordlog.create_queue(&queue_id).await { Ok(_) => {} - Err(CreateQueueError::AlreadyExists) => panic!("queue should not exist"), + Err(CreateQueueError::AlreadyExists) => { + error!("WAL queue `{queue_id}` already exists"); + let message = format!("WAL queue `{queue_id}` already exists"); + return Err(IngestV2Error::Internal(message)); + } Err(CreateQueueError::IoError(io_error)) => { - // TODO: Close all shards and set readiness to false. - error!("failed to create mrecordlog queue `{queue_id}`: {io_error}"); - return Err(IngestV2Error::Internal(format!("Io Error: {io_error}"))); + error!("failed to create WAL queue `{queue_id}`: {io_error}",); + let message = format!("failed to create WAL queue `{queue_id}`: {io_error}"); + return Err(IngestV2Error::Internal(message)); } }; let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); @@ -348,7 +351,7 @@ impl Ingester { async fn init_replication_stream( &self, - replication_streams: &mut FnvHashMap, + replication_streams: &mut HashMap, leader_id: NodeId, follower_id: NodeId, ) -> IngestV2Result { @@ -460,7 +463,8 @@ impl Ingester { // first verify if we would locally accept each subrequest { - let mut sum_of_requested_capacity = bytesize::ByteSize::b(0); + let mut total_requested_capacity = bytesize::ByteSize::b(0); + for subrequest in persist_request.subrequests { let queue_id = subrequest.queue_id(); @@ -515,7 +519,7 @@ impl Ingester { &state_guard.mrecordlog, self.disk_capacity, self.memory_capacity, - requested_capacity + sum_of_requested_capacity, + requested_capacity + total_requested_capacity, ) { rate_limited_warn!( limit_per_min = 10, @@ -553,7 +557,7 @@ impl Ingester { let batch_num_bytes = doc_batch.num_bytes() as u64; rate_meter.update(batch_num_bytes); - sum_of_requested_capacity += requested_capacity; + total_requested_capacity += requested_capacity; if let Some(follower_id) = follower_id_opt { let replicate_subrequest = ReplicateSubrequest { @@ -736,7 +740,6 @@ impl Ingester { persist_successes.push(persist_success); } } - if !shards_to_close.is_empty() { for queue_id in &shards_to_close { let shard = state_guard @@ -745,18 +748,15 @@ impl Ingester { .expect("shard should exist"); shard.close(); + warn!("closed shard `{queue_id}` following IO error"); } - info!( - "closed {} shard(s) following IO error(s)", - shards_to_close.len() - ); } if !shards_to_delete.is_empty() { for queue_id in &shards_to_delete { state_guard.shards.remove(queue_id); state_guard.rate_trackers.remove(queue_id); + warn!("deleted dangling shard `{queue_id}`"); } - info!("deleted {} dangling shard(s)", shards_to_delete.len()); } let wal_usage = state_guard.mrecordlog.resource_usage(); drop(state_guard); @@ -1613,10 +1613,13 @@ mod tests { ); } - // This test should be run manually and independently of other tests with the `fail/failpoints` - // feature enabled. + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_ingester_persist_closes_shard_on_io_error + // ``` + #[cfg(feature = "failpoints")] #[tokio::test] - #[ignore] async fn test_ingester_persist_closes_shard_on_io_error() { let scenario = fail::FailScenario::setup(); fail::cfg("ingester:append_records", "return").unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index bb4826f57f..0f57f1e8c3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -33,6 +33,7 @@ mod routing_table; mod state; mod workbench; +use std::collections::HashMap; use std::ops::{Add, AddAssign}; use std::time::Duration; use std::{env, fmt}; @@ -40,7 +41,6 @@ use std::{env, fmt}; pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos}; use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; -use fnv::FnvHashMap; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest}; @@ -138,7 +138,7 @@ impl DocBatchV2Builder { /// Helper struct to build an [`IngestRequestV2`]. #[derive(Debug, Default)] pub struct IngestRequestV2Builder { - per_index_id_doc_batch_builders: FnvHashMap, + per_index_id_doc_batch_builders: HashMap, } impl IngestRequestV2Builder { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index f67c28c467..1b40e01618 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -22,6 +22,7 @@ use std::iter::once; use std::ops::RangeInclusive; use bytesize::ByteSize; +#[cfg(feature = "failpoints")] use fail::fail_point; use mrecordlog::error::{AppendError, DeleteQueueError}; use quickwit_proto::ingest::DocBatchV2; @@ -54,19 +55,25 @@ pub(super) async fn append_non_empty_doc_batch( .docs() .map(|doc| MRecord::Doc(doc).encode()) .chain(once(MRecord::Commit.encode())); + + #[cfg(feature = "failpoints")] fail_point!("ingester:append_records", |_| { let io_error = io::Error::from(io::ErrorKind::PermissionDenied); Err(AppendDocBatchError::Io(io_error)) }); + mrecordlog .append_records(queue_id, None, encoded_mrecords) .await } else { let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); + + #[cfg(feature = "failpoints")] fail_point!("ingester:append_records", |_| { let io_error = io::Error::from(io::ErrorKind::PermissionDenied); Err(AppendDocBatchError::Io(io_error)) }); + mrecordlog .append_records(queue_id, None, encoded_mrecords) .await @@ -203,10 +210,13 @@ mod tests { assert_eq!(position, Position::offset(2u64)); } - // This test should be run manually and independently of other tests with the `fail/failpoints` - // feature enabled. + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_append_non_empty_doc_batch_io_error + // ``` + #[cfg(feature = "failpoints")] #[tokio::test] - #[ignore] async fn test_append_non_empty_doc_batch_io_error() { let scenario = fail::FailScenario::setup(); fail::cfg("ingester:append_records", "return").unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.md b/quickwit/quickwit-ingest/src/ingest_v2/replication.md index fdc4ab03ad..c95fa7d785 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.md +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.md @@ -9,14 +9,20 @@ Two gRPC streams back the independent streams of requests and responses between ### Life of a happy persist request 1. Leader receives a persist request pre-assigned to a shard from a router. -1. Leader writes the data to the corresponding mrecordlog queue and records the new position of the queue called `primary_position`. - -1. Leader sends replicate request to follower of the shard via the SYN replication stream. +1. Leader forwards replicate request to follower of the shard via the SYN replication stream. 1. Follower receives the replicate request, writes the data to its replica queue, and records the new position of the queue called `replica_position`. 1. Follower returns replicate response to leader via the ACK replication stream. -1. Leader records the new position of the replica queue. It should match the `primary_position`. +1. Leader records the new position of the replica queue. + +1. Leader writes the data to its local mrecordlog queue and records the new position of the queue called `primary_position`. It should match the `replica_position`. 1. Leader return success persist response to router. + +### Replication stream errors + +- When a replication request fails, the leader and follower close the shard(s) targetted by the request. + +- When a replication stream fails (transport error, timeout), the leader and follower close the shard(s) targetted by the stream. Then, the leader reopens a new stream if necessary. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index a32d71e38f..e8265e1d4f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -17,11 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::iter::once; +use std::collections::HashSet; use std::time::{Duration, Instant}; use bytesize::ByteSize; use futures::{Future, StreamExt}; +use mrecordlog::error::CreateQueueError; use quickwit_common::{rate_limited_warn, ServiceStream}; use quickwit_proto::ingest::ingester::{ ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus, @@ -30,7 +31,7 @@ use quickwit_proto::ingest::ingester::{ SynReplicationMessage, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardState}; -use quickwit_proto::types::{NodeId, Position}; +use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; @@ -38,9 +39,9 @@ use tracing::{error, warn}; use super::metrics::report_wal_usage; use super::models::IngesterShard; -use super::mrecord::MRecord; use super::mrecordlog_utils::check_enough_capacity; use super::state::IngesterState; +use crate::ingest_v2::mrecordlog_utils::{append_non_empty_doc_batch, AppendDocBatchError}; use crate::metrics::INGEST_METRICS; use crate::{estimate_size, with_lock_metrics}; @@ -455,12 +456,19 @@ impl ReplicationTask { let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_replica", "write").await?; - state_guard - .mrecordlog - .create_queue(&queue_id) - .await - .expect("TODO: Handle IO error"); - + match state_guard.mrecordlog.create_queue(&queue_id).await { + Ok(_) => {} + Err(CreateQueueError::AlreadyExists) => { + error!("WAL queue `{queue_id}` already exists"); + let message = format!("WAL queue `{queue_id}` already exists"); + return Err(IngestV2Error::Internal(message)); + } + Err(CreateQueueError::IoError(io_error)) => { + error!("failed to create WAL queue `{queue_id}`: {io_error}",); + let message = format!("failed to create WAL queue `{queue_id}`: {io_error}"); + return Err(IngestV2Error::Internal(message)); + } + }; let replica_shard = IngesterShard::new_replica( replica_shard.leader_id.into(), ShardState::Open, @@ -507,6 +515,13 @@ impl ReplicationTask { let mut replicate_successes = Vec::with_capacity(replicate_request.subrequests.len()); let mut replicate_failures = Vec::new(); + // Keep track of the shards that need to be closed following an IO error. + let mut shards_to_close: HashSet = HashSet::new(); + + // Keep track of dangling shards, i.e., shards for which there is no longer a corresponding + // queue in the WAL and should be deleted. + let mut shards_to_delete: HashSet = HashSet::new(); + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?; @@ -599,7 +614,6 @@ impl ReplicationTask { "failed to replicate records to ingester `{}`: {error}", self.follower_id, ); - let replicate_failure = ReplicateFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -610,26 +624,48 @@ impl ReplicationTask { replicate_failures.push(replicate_failure); continue; }; - let current_position_inclusive: Position = if force_commit { - let encoded_mrecords = doc_batch - .docs() - .map(|doc| MRecord::Doc(doc).encode()) - .chain(once(MRecord::Commit.encode())); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") - } else { - let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") - } - .map(Position::offset) - .expect("records should not be empty"); + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + doc_batch, + force_commit, + ) + .await; + + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!("failed to replicate records to shard `{queue_id}`: {io_error}"); + shards_to_close.insert(queue_id); + ReplicateFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "failed to replicate records to shard `{queue_id}`: WAL queue not \ + found" + ); + shards_to_delete.insert(queue_id); + ReplicateFailureReason::ShardNotFound + } + }; + let replicate_failure = ReplicateFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: reason as i32, + }; + replicate_failures.push(replicate_failure); + continue; + } + }; + state_guard + .shards + .get_mut(&queue_id) + .expect("replica shard should be initialized") + .set_replication_position_inclusive(current_position_inclusive.clone(), now); INGEST_METRICS .replicated_num_bytes_total @@ -638,13 +674,6 @@ impl ReplicationTask { .replicated_num_docs_total .inc_by(batch_num_docs); - let replica_shard = state_guard - .shards - .get_mut(&queue_id) - .expect("replica shard should be initialized"); - replica_shard - .set_replication_position_inclusive(current_position_inclusive.clone(), now); - let replicate_success = ReplicateSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -654,6 +683,25 @@ impl ReplicationTask { }; replicate_successes.push(replicate_success); } + if !shards_to_close.is_empty() { + for queue_id in &shards_to_close { + let shard = state_guard + .shards + .get_mut(queue_id) + .expect("shard should exist"); + + shard.shard_state = ShardState::Closed; + shard.notify_shard_status(); + warn!("closed shard `{queue_id}` following IO error"); + } + } + if !shards_to_delete.is_empty() { + for queue_id in &shards_to_delete { + state_guard.shards.remove(queue_id); + state_guard.rate_trackers.remove(queue_id); + warn!("deleted dangling shard `{queue_id}`"); + } + } let wal_usage = state_guard.mrecordlog.resource_usage(); drop(state_guard); @@ -1323,6 +1371,183 @@ mod tests { ); } + #[tokio::test] + async fn test_replication_task_deletes_dangling_shard() { + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let (_temp_dir, state) = IngesterState::for_test().await; + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id.clone(), + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + Instant::now(), + ); + state + .lock_fully() + .await + .unwrap() + .shards + .insert(queue_id_01.clone(), replica_shard); + + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: Position::offset(0u64).into(), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure = &replicate_response.failures[0]; + assert_eq!(replicate_failure.index_uid(), &index_uid); + assert_eq!(replicate_failure.source_id, "test-source"); + assert_eq!(replicate_failure.shard_id(), ShardId::from(1)); + assert_eq!( + replicate_failure.reason(), + ReplicateFailureReason::ShardNotFound + ); + + let state_guard = state.lock_partially().await.unwrap(); + assert!(!state_guard.shards.contains_key(&queue_id_01)); + } + + // This test should be run manually and independently of other tests with the `failpoints` + // feature enabled: + // ```sh + // cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_replication_task_closes_shard_on_io_error + // ``` + #[cfg(feature = "failpoints")] + #[tokio::test] + async fn test_replication_task_closes_shard_on_io_error() { + let scenario = fail::FailScenario::setup(); + fail::cfg("ingester:append_records", "return").unwrap(); + + let leader_id: NodeId = "test-leader".into(); + let follower_id: NodeId = "test-follower".into(); + let (_temp_dir, state) = IngesterState::for_test().await; + let (syn_replication_stream_tx, syn_replication_stream) = + ServiceStream::new_bounded(SYN_REPLICATION_STREAM_CAPACITY); + let (ack_replication_stream_tx, mut ack_replication_stream) = + ServiceStream::new_unbounded(); + + let disk_capacity = ByteSize::mb(256); + let memory_capacity = ByteSize::mb(1); + + let _replication_task_handle = ReplicationTask::spawn( + leader_id.clone(), + follower_id, + state.clone(), + syn_replication_stream, + ack_replication_stream_tx, + disk_capacity, + memory_capacity, + ); + + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); + let replica_shard = IngesterShard::new_replica( + leader_id, + ShardState::Open, + Position::Beginning, + Position::Beginning, + Instant::now(), + ); + let mut state_guard = state.lock_fully().await.unwrap(); + + state_guard + .shards + .insert(queue_id_01.clone(), replica_shard); + + state_guard + .mrecordlog + .create_queue(&queue_id_01) + .await + .unwrap(); + + drop(state_guard); + + let replicate_request = ReplicateRequest { + leader_id: "test-leader".to_string(), + follower_id: "test-follower".to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![ReplicateSubrequest { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + from_position_exclusive: Position::offset(0u64).into(), + }], + replication_seqno: 0, + }; + let syn_replication_message = + SynReplicationMessage::new_replicate_request(replicate_request); + syn_replication_stream_tx + .send(syn_replication_message) + .await + .unwrap(); + let ack_replication_message = ack_replication_stream.next().await.unwrap().unwrap(); + let replicate_response = into_replicate_response(ack_replication_message); + + assert_eq!(replicate_response.follower_id, "test-follower"); + assert_eq!(replicate_response.successes.len(), 0); + assert_eq!(replicate_response.failures.len(), 1); + + let replicate_failure = &replicate_response.failures[0]; + assert_eq!(replicate_failure.index_uid(), &index_uid); + assert_eq!(replicate_failure.source_id, "test-source"); + assert_eq!(replicate_failure.shard_id(), ShardId::from(1)); + assert_eq!( + replicate_failure.reason(), + ReplicateFailureReason::ShardClosed + ); + + let state_guard = state.lock_partially().await.unwrap(); + let replica_shard = state_guard.shards.get(&queue_id_01).unwrap(); + replica_shard.assert_is_closed(); + + scenario.teardown(); + } + #[tokio::test] async fn test_replication_task_resource_exhausted() { let leader_id: NodeId = "test-leader".into(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 2b221a78e9..769760dccf 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -17,13 +17,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashMap; use std::fmt; use std::ops::{Deref, DerefMut}; use std::path::Path; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use fnv::FnvHashMap; use mrecordlog::error::{DeleteQueueError, TruncateError}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -56,12 +56,12 @@ pub(super) struct IngesterState { } pub(super) struct InnerIngesterState { - pub shards: FnvHashMap, - pub rate_trackers: FnvHashMap, + pub shards: HashMap, + pub rate_trackers: HashMap, // Replication stream opened with followers. - pub replication_streams: FnvHashMap, + pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. - pub replication_tasks: FnvHashMap, + pub replication_tasks: HashMap, status: IngesterStatus, status_tx: watch::Sender, }