Skip to content

Commit

Permalink
Handle IO errors in replication tasks (#4540)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 21, 2024
1 parent c1c6301 commit 38d559e
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 73 deletions.
1 change: 0 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async fn aux_test_failpoints() -> anyhow::Result<()> {
Ok(())
}

const TEST_TEXT: &'static str = r#"His sole child, my lord, and bequeathed to my
const TEST_TEXT: &str = r#"His sole child, my lord, and bequeathed to my
overlooking. I have those hopes of her good that
her education promises; her dispositions she
inherits, which makes fair gifts fairer; for where
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
dyn-clone = { workspace = true }
fail = { workspace = true }
fail = { workspace = true, optional = true }
flume = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
Expand Down Expand Up @@ -59,4 +58,5 @@ quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-codegen = { workspace = true }

[features]
failpoints = ["fail/failpoints"]
testsuite = ["mockall"]
39 changes: 21 additions & 18 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use async_trait::async_trait;
use bytesize::ByteSize;
use fnv::FnvHashMap;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
Expand Down Expand Up @@ -190,11 +189,15 @@ impl Ingester {
};
match mrecordlog.create_queue(&queue_id).await {
Ok(_) => {}
Err(CreateQueueError::AlreadyExists) => panic!("queue should not exist"),
Err(CreateQueueError::AlreadyExists) => {
error!("WAL queue `{queue_id}` already exists");
let message = format!("WAL queue `{queue_id}` already exists");
return Err(IngestV2Error::Internal(message));
}
Err(CreateQueueError::IoError(io_error)) => {
// TODO: Close all shards and set readiness to false.
error!("failed to create mrecordlog queue `{queue_id}`: {io_error}");
return Err(IngestV2Error::Internal(format!("Io Error: {io_error}")));
error!("failed to create WAL queue `{queue_id}`: {io_error}",);
let message = format!("failed to create WAL queue `{queue_id}`: {io_error}");
return Err(IngestV2Error::Internal(message));
}
};
let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings);
Expand Down Expand Up @@ -348,7 +351,7 @@ impl Ingester {

async fn init_replication_stream(
&self,
replication_streams: &mut FnvHashMap<FollowerId, ReplicationStreamTaskHandle>,
replication_streams: &mut HashMap<FollowerId, ReplicationStreamTaskHandle>,
leader_id: NodeId,
follower_id: NodeId,
) -> IngestV2Result<ReplicationClient> {
Expand Down Expand Up @@ -460,7 +463,8 @@ impl Ingester {

// first verify if we would locally accept each subrequest
{
let mut sum_of_requested_capacity = bytesize::ByteSize::b(0);
let mut total_requested_capacity = bytesize::ByteSize::b(0);

for subrequest in persist_request.subrequests {
let queue_id = subrequest.queue_id();

Expand Down Expand Up @@ -515,7 +519,7 @@ impl Ingester {
&state_guard.mrecordlog,
self.disk_capacity,
self.memory_capacity,
requested_capacity + sum_of_requested_capacity,
requested_capacity + total_requested_capacity,
) {
rate_limited_warn!(
limit_per_min = 10,
Expand Down Expand Up @@ -553,7 +557,7 @@ impl Ingester {

let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
sum_of_requested_capacity += requested_capacity;
total_requested_capacity += requested_capacity;

if let Some(follower_id) = follower_id_opt {
let replicate_subrequest = ReplicateSubrequest {
Expand Down Expand Up @@ -736,7 +740,6 @@ impl Ingester {
persist_successes.push(persist_success);
}
}

if !shards_to_close.is_empty() {
for queue_id in &shards_to_close {
let shard = state_guard
Expand All @@ -745,18 +748,15 @@ impl Ingester {
.expect("shard should exist");

shard.close();
warn!("closed shard `{queue_id}` following IO error");
}
info!(
"closed {} shard(s) following IO error(s)",
shards_to_close.len()
);
}
if !shards_to_delete.is_empty() {
for queue_id in &shards_to_delete {
state_guard.shards.remove(queue_id);
state_guard.rate_trackers.remove(queue_id);
warn!("deleted dangling shard `{queue_id}`");
}
info!("deleted {} dangling shard(s)", shards_to_delete.len());
}
let wal_usage = state_guard.mrecordlog.resource_usage();
drop(state_guard);
Expand Down Expand Up @@ -1613,10 +1613,13 @@ mod tests {
);
}

// This test should be run manually and independently of other tests with the `fail/failpoints`
// feature enabled.
// This test should be run manually and independently of other tests with the `failpoints`
// feature enabled:
// ```sh
// cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_ingester_persist_closes_shard_on_io_error
// ```
#[cfg(feature = "failpoints")]
#[tokio::test]
#[ignore]
async fn test_ingester_persist_closes_shard_on_io_error() {
let scenario = fail::FailScenario::setup();
fail::cfg("ingester:append_records", "return").unwrap();
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ mod routing_table;
mod state;
mod workbench;

use std::collections::HashMap;
use std::ops::{Add, AddAssign};
use std::time::Duration;
use std::{env, fmt};

pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos};
use bytes::{BufMut, BytesMut};
use bytesize::ByteSize;
use fnv::FnvHashMap;
use quickwit_common::tower::Pool;
use quickwit_proto::ingest::ingester::IngesterServiceClient;
use quickwit_proto::ingest::router::{IngestRequestV2, IngestSubrequest};
Expand Down Expand Up @@ -138,7 +138,7 @@ impl DocBatchV2Builder {
/// Helper struct to build an [`IngestRequestV2`].
#[derive(Debug, Default)]
pub struct IngestRequestV2Builder {
per_index_id_doc_batch_builders: FnvHashMap<IndexId, DocBatchV2Builder>,
per_index_id_doc_batch_builders: HashMap<IndexId, DocBatchV2Builder>,
}

impl IngestRequestV2Builder {
Expand Down
16 changes: 13 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::iter::once;
use std::ops::RangeInclusive;

use bytesize::ByteSize;
#[cfg(feature = "failpoints")]
use fail::fail_point;
use mrecordlog::error::{AppendError, DeleteQueueError};
use quickwit_proto::ingest::DocBatchV2;
Expand Down Expand Up @@ -54,19 +55,25 @@ pub(super) async fn append_non_empty_doc_batch(
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));

#[cfg(feature = "failpoints")]
fail_point!("ingester:append_records", |_| {
let io_error = io::Error::from(io::ErrorKind::PermissionDenied);
Err(AppendDocBatchError::Io(io_error))
});

mrecordlog
.append_records(queue_id, None, encoded_mrecords)
.await
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());

#[cfg(feature = "failpoints")]
fail_point!("ingester:append_records", |_| {
let io_error = io::Error::from(io::ErrorKind::PermissionDenied);
Err(AppendDocBatchError::Io(io_error))
});

mrecordlog
.append_records(queue_id, None, encoded_mrecords)
.await
Expand Down Expand Up @@ -203,10 +210,13 @@ mod tests {
assert_eq!(position, Position::offset(2u64));
}

// This test should be run manually and independently of other tests with the `fail/failpoints`
// feature enabled.
// This test should be run manually and independently of other tests with the `failpoints`
// feature enabled:
// ```sh
// cargo test --manifest-path quickwit/Cargo.toml -p quickwit-ingest --features failpoints -- test_append_non_empty_doc_batch_io_error
// ```
#[cfg(feature = "failpoints")]
#[tokio::test]
#[ignore]
async fn test_append_non_empty_doc_batch_io_error() {
let scenario = fail::FailScenario::setup();
fail::cfg("ingester:append_records", "return").unwrap();
Expand Down
14 changes: 10 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ Two gRPC streams back the independent streams of requests and responses between
### Life of a happy persist request
1. Leader receives a persist request pre-assigned to a shard from a router.

1. Leader writes the data to the corresponding mrecordlog queue and records the new position of the queue called `primary_position`.

1. Leader sends replicate request to follower of the shard via the SYN replication stream.
1. Leader forwards replicate request to follower of the shard via the SYN replication stream.

1. Follower receives the replicate request, writes the data to its replica queue, and records the new position of the queue called `replica_position`.

1. Follower returns replicate response to leader via the ACK replication stream.

1. Leader records the new position of the replica queue. It should match the `primary_position`.
1. Leader records the new position of the replica queue.

1. Leader writes the data to its local mrecordlog queue and records the new position of the queue called `primary_position`. It should match the `replica_position`.

1. Leader return success persist response to router.

### Replication stream errors

- When a replication request fails, the leader and follower close the shard(s) targetted by the request.

- When a replication stream fails (transport error, timeout), the leader and follower close the shard(s) targetted by the stream. Then, the leader reopens a new stream if necessary.
Loading

0 comments on commit 38d559e

Please sign in to comment.