Skip to content

Commit

Permalink
Merge branch 'main' into 5117-disable-per-index-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jun 14, 2024
2 parents 1e0e2fa + 1c2ec26 commit 1ab9741
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 33 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tracing::error;
/// QW_RUNTIME_NUM_THREADS environment variable.
fn get_main_runtime_num_threads() -> usize {
let default_num_runtime_threads: usize = quickwit_common::num_cpus().div_ceil(3);
quickwit_common::get_from_env("QW_RUNTIME_NUM_THREADS", default_num_runtime_threads)
quickwit_common::get_from_env("QW_TOKIO_RUNTIME_NUM_THREADS", default_num_runtime_threads)
}

fn main() -> anyhow::Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ impl CloseIdleShardsTask {
let Some(state) = self.weak_state.upgrade() else {
return;
};
let mut state_guard =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write")
.await
.expect("ingester should be ready");
let Ok(mut state_guard) =
with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await
else {
return;
};

let now = Instant::now();

Expand Down
35 changes: 7 additions & 28 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3174,31 +3174,24 @@ mod tests {

#[tokio::test]
async fn test_ingester_closes_idle_shards() {
// The `CloseIdleShardsTask` task is already unit tested, so this test ensures the task is
// correctly spawned upon starting an ingester.
let idle_shard_timeout = Duration::from_millis(200);
let (_ingester_ctx, ingester) = IngesterForTest::default()
.with_idle_shard_timeout(idle_shard_timeout)
.build()
.await;

let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));

let shard_01 = Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
..Default::default()
};
let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1));

let shard_02 = Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2)),
shard_state: ShardState::Closed as i32,
..Default::default()
};
let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2));

let mut state_guard = ingester.state.lock_fully().await.unwrap();
let now = Instant::now();

Expand All @@ -3211,31 +3204,17 @@ mod tests {
)
.await
.unwrap();
ingester
.init_primary_shard(
&mut state_guard.inner,
&mut state_guard.mrecordlog,
shard_02,
now,
)
.await
.unwrap();
drop(state_guard);

tokio::time::sleep(Duration::from_millis(100)).await; // 2 times the run interval period of the close idle shards task
drop(state_guard);
tokio::time::sleep(Duration::from_millis(500)).await;

let state_guard = ingester.state.lock_partially().await.unwrap();

state_guard
.shards
.get(&queue_id_01)
.unwrap()
.assert_is_closed();
state_guard
.shards
.get(&queue_id_02)
.unwrap()
.assert_is_open();
drop(state_guard);
}

#[tokio::test]
Expand Down

0 comments on commit 1ab9741

Please sign in to comment.