From 6c46d9e9c79e0ef60a6d12e3b0224b255268ddf7 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 25 Apr 2023 11:10:35 +0900 Subject: [PATCH] Avoid recomputing new_pipeline_ids to increase readability. (#3215) --- .../quickwit-indexing/src/actors/indexing_service.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 6282c695b9..57267de233 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -472,14 +472,18 @@ impl IndexingService { let running_pipeline_ids: HashSet = self.indexing_pipeline_handles.keys().cloned().collect(); - let new_pipeline_ids = updated_pipeline_ids + + let new_pipeline_ids: Vec<&IndexingPipelineId> = updated_pipeline_ids .difference(&running_pipeline_ids) - .collect_vec(); + .collect(); + // We fetch the new indexes metadata. let indexes_metadata_futures = new_pipeline_ids .iter() + // No need to emit two request for the same `index_id` .unique_by(|pipeline_id| pipeline_id.index_id.clone()) .map(|pipeline_id| self.index_metadata(ctx, &pipeline_id.index_id)); + let indexes_metadata = try_join_all(indexes_metadata_futures).await?; let indexes_metadata_by_index_id: HashMap = indexes_metadata .into_iter() @@ -489,7 +493,7 @@ impl IndexingService { let mut failed_spawning_pipeline_ids: Vec = Vec::new(); // Add new pipelines. - for new_pipeline_id in updated_pipeline_ids.difference(&running_pipeline_ids) { + for new_pipeline_id in new_pipeline_ids { info!(pipeline_id=?new_pipeline_id, "Spawning indexing pipeline."); let index_metadata = indexes_metadata_by_index_id .get(&new_pipeline_id.index_id)