Skip to content

Commit

Permalink
Fix DiskANN's Async cache making (#250)
Browse files Browse the repository at this point in the history
Fix DiskANN Async Cache making:

Create State Controller with longer lifespan than the PQ_Flash_Index.
Add R/W lock to cache generating and search

Signed-off-by: Li Liu <[email protected]>
Co-authored-by: cqy123456 <[email protected]>
  • Loading branch information
liliu-z and cqy123456 committed Dec 5, 2023
1 parent b596cfb commit c0aa5fb
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 277 deletions.
7 changes: 2 additions & 5 deletions knowhere/index/vector_index/IndexDiskANN.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ IndexDiskANN<T>::IndexDiskANN(std::string index_prefix, MetricType metric_type,
namespace {
static constexpr float kCacheExpansionRate = 1.2;
static constexpr uint32_t kLinuxAioMaxnrLimit = 65536;
static auto async_pool = ThreadPool(1);
void
CheckPreparation(bool is_prepared) {
if (!is_prepared) {
Expand Down Expand Up @@ -316,10 +315,8 @@ IndexDiskANN<T>::Prepare(const Config& config) {
}
} else {
// init the statistical object
pq_flash_index_->init_cache_async_task();
async_pool.push([&, cache_num = num_nodes_to_cache, sample_nodes_file = warmup_query_file]() {
pq_flash_index_->generate_cache_list_from_sample_queries(sample_nodes_file, 15, 6, cache_num);
});
pq_flash_index_->async_generate_cache_list_from_sample_queries(warmup_query_file, 15, 6,
num_nodes_to_cache);
}
}
// warmup
Expand Down
4 changes: 2 additions & 2 deletions knowhere/index/vector_index/IndexDiskANN.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class IndexDiskANN : public VecIndex {
}

/**
* @brief Due to legacy reasons, Train() and AddWithoutIds() are bonded together. We will put the building work in
* AddWIthoutIds() and leave Train() empty for now.
* @brief Due to legacy reasons, Train() and AddWithoutIds() are bonded together. We will put the building work
* in AddWIthoutIds() and leave Train() empty for now.
*/
void
Train(const DatasetPtr& /* unused */, const Config& /* unused */) override{};
Expand Down
94 changes: 55 additions & 39 deletions thirdparty/DiskANN/include/pq_flash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

#pragma once
#include <cassert>
#include <condition_variable>
#include <optional>
#include <mutex>
#include <sstream>
#include <stack>
#include <string>
Expand All @@ -22,7 +24,6 @@
#include "percentile_stats.h"
#include "pq_table.h"
#include "utils.h"
#include "semaphore.h"
#include "windows_customizations.h"

#define MAX_GRAPH_DEGREE 512
Expand All @@ -32,9 +33,25 @@
#define FULL_PRECISION_REORDER_MULTIPLIER 3

namespace diskann {
class ThreadSafeStateController {
public:
enum class Status {
NONE,
DOING,
STOPPING,
DONE,
KILLED,
};

Status status;
std::condition_variable cond;
std::mutex status_mtx;
std::shared_mutex cache_mtx;
};

template<typename T>
struct QueryScratch {
T * coord_scratch = nullptr; // MUST BE AT LEAST [sizeof(T) * data_dim]
T *coord_scratch = nullptr; // MUST BE AT LEAST [sizeof(T) * data_dim]

char *sector_scratch =
nullptr; // MUST BE AT LEAST [MAX_N_SECTOR_READS * SECTOR_LEN]
Expand All @@ -46,7 +63,7 @@ namespace diskann {
nullptr; // MUST BE AT LEAST diskann MAX_DEGREE
_u8 *aligned_pq_coord_scratch =
nullptr; // MUST BE AT LEAST [N_CHUNKS * MAX_DEGREE]
T * aligned_query_T = nullptr;
T *aligned_query_T = nullptr;
float *aligned_query_float = nullptr;

tsl::robin_set<_u64> *visited = nullptr;
Expand All @@ -68,26 +85,26 @@ namespace diskann {
public:
DISKANN_DLLEXPORT PQFlashIndex(
std::shared_ptr<AlignedFileReader> fileReader,
diskann::Metric metric = diskann::Metric::L2);
diskann::Metric metric = diskann::Metric::L2);
DISKANN_DLLEXPORT ~PQFlashIndex();

#ifdef EXEC_ENV_OLS
DISKANN_DLLEXPORT int load(diskann::MemoryMappedFiles &files,
uint32_t num_threads, const char *index_prefix);
#else
// load compressed data, and obtains the handle to the disk-resident index
DISKANN_DLLEXPORT int load(uint32_t num_threads, const char *index_prefix);
DISKANN_DLLEXPORT int load(uint32_t num_threads, const char *index_prefix);
#endif

DISKANN_DLLEXPORT void load_cache_list(std::vector<uint32_t> &node_list);

// asynchronously collect the access frequency of each node in the graph
#ifdef EXEC_ENV_OLS
DISKANN_DLLEXPORT void generate_cache_list_from_sample_queries(
DISKANN_DLLEXPORT void async_generate_cache_list_from_sample_queries(
MemoryMappedFiles files, std::string sample_bin, _u64 l_search,
_u64 beamwidth, _u64 num_nodes_to_cache);
#else
DISKANN_DLLEXPORT void generate_cache_list_from_sample_queries(
DISKANN_DLLEXPORT void async_generate_cache_list_from_sample_queries(
std::string sample_bin, _u64 l_search, _u64 beamwidth,
_u64 num_nodes_to_cache);
#endif
Expand All @@ -100,19 +117,15 @@ namespace diskann {
float *res_dists, const _u64 beam_width,
const bool use_reorder_data = false, QueryStats *stats = nullptr,
const knowhere::feder::diskann::FederResultUniq &feder = nullptr,
faiss::BitsetView bitset_view = nullptr,
const float filter_ratio = -1.0f);
faiss::BitsetView bitset_view = nullptr,
const float filter_ratio = -1.0f);


DISKANN_DLLEXPORT _u32 range_search(const T *query1, const double range,
const _u64 min_l_search,
const _u64 max_l_search,
std::vector<_s64> & indices,
std::vector<float> &distances,
const _u64 beam_width,
const float l_k_ratio,
faiss::BitsetView bitset_view = nullptr,
QueryStats * stats = nullptr);
DISKANN_DLLEXPORT _u32
range_search(const T *query1, const double range, const _u64 min_l_search,
const _u64 max_l_search, std::vector<_s64> &indices,
std::vector<float> &distances, const _u64 beam_width,
const float l_k_ratio, faiss::BitsetView bitset_view = nullptr,
QueryStats *stats = nullptr);

std::shared_ptr<AlignedFileReader> reader;

Expand All @@ -122,13 +135,13 @@ namespace diskann {

DISKANN_DLLEXPORT _u64 get_max_degree() const noexcept;

DISKANN_DLLEXPORT _u32* get_medoids() const noexcept;
DISKANN_DLLEXPORT _u32 *get_medoids() const noexcept;

DISKANN_DLLEXPORT size_t get_num_medoids() const noexcept;

DISKANN_DLLEXPORT diskann::Metric get_metric() const noexcept;
// init asy
DISKANN_DLLEXPORT void init_cache_async_task();
// for async cache making task
DISKANN_DLLEXPORT void destroy_cache_async_task();

protected:
DISKANN_DLLEXPORT void use_medoids_data_as_centroids();
Expand All @@ -138,14 +151,15 @@ namespace diskann {
private:
// sector # on disk where node_id is present with in the graph part
_u64 get_node_sector_offset(_u64 node_id) {
return long_node ? (node_id * nsectors_per_node + 1) * SECTOR_LEN
: (node_id / nnodes_per_sector + 1) * SECTOR_LEN;
return long_node ? (node_id * nsectors_per_node + 1) * SECTOR_LEN
: (node_id / nnodes_per_sector + 1) * SECTOR_LEN;
}

// obtains region of sector containing node
char* get_offset_to_node(char* sector_buf, _u64 node_id) {
return long_node ? sector_buf
: sector_buf + (node_id % nnodes_per_sector) * max_node_len;
char *get_offset_to_node(char *sector_buf, _u64 node_id) {
return long_node
? sector_buf
: sector_buf + (node_id % nnodes_per_sector) * max_node_len;
}

inline void copy_vec_base_data(T *des, const int64_t des_idx, void *src);
Expand All @@ -160,10 +174,10 @@ namespace diskann {
// The beam width is adjusted in the function.
void brute_force_beam_search(
ThreadData<T> &data, const float query_norm, const _u64 k_search,
_s64 *indices, float *distances, const _u64 beam_width_param, IOContext &ctx,
QueryStats *stats,
_s64 *indices, float *distances, const _u64 beam_width_param,
IOContext &ctx, QueryStats *stats,
const knowhere::feder::diskann::FederResultUniq &feder,
faiss::BitsetView bitset_view);
faiss::BitsetView bitset_view);

// index info
// nhood of node `i` is in sector: [i / nnodes_per_sector]
Expand Down Expand Up @@ -204,13 +218,13 @@ namespace diskann {
// data: _u8 * n_chunks
// chunk_size = chunk size of each dimension chunk
// pq_tables = float* [[2^8 * [chunk_size]] * n_chunks]
_u8 * data = nullptr;
_u8 *data = nullptr;
_u64 n_chunks;
FixedChunkPQTable pq_table;

// distance function
DISTFUN<T> dist_cmp;
DISTFUN<float> dist_cmp_float;
DISTFUN<T> dist_cmp;
DISTFUN<float> dist_cmp_float;

// for very large datasets: we use PQ even for the disk resident index
bool use_disk_index_pq = false;
Expand All @@ -230,14 +244,16 @@ namespace diskann {
float *centroid_data = nullptr;

// nhood_cache
unsigned * nhood_cache_buf = nullptr;
tsl::robin_map<_u32, std::pair<_u32, _u32 *>> nhood_cache; // <id, <neihbors_num, neihbors>>
unsigned *nhood_cache_buf = nullptr;
tsl::robin_map<_u32, std::pair<_u32, _u32 *>>
nhood_cache; // <id, <neihbors_num, neihbors>>

// coord_cache
T * coord_cache_buf = nullptr;
T *coord_cache_buf = nullptr;
tsl::robin_map<_u32, T *> coord_cache;
Semaphore semaph;
std::atomic<bool> async_generate_cache = false;

std::shared_ptr<ThreadSafeStateController> state_controller =
std::make_shared<ThreadSafeStateController>();

// thread-specific scratch
ConcurrentQueue<ThreadData<T>> thread_data;
Expand All @@ -254,7 +270,7 @@ namespace diskann {
// any additions we make to the header. This is an outer limit
// on how big the header can be.
static const int HEADER_SIZE = SECTOR_LEN;
char * getHeaderBytes();
char *getHeaderBytes();
#endif
};
} // namespace diskann
35 changes: 0 additions & 35 deletions thirdparty/DiskANN/include/semaphore.h

This file was deleted.

Loading

0 comments on commit c0aa5fb

Please sign in to comment.