Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add more trace for search & query #32734

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/core/src/segcore/Reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ReduceHelper::Reduce() {

void
ReduceHelper::Marshal() {
tracer::AutoSpan span("ReduceHelper::Marshal", trace_ctx_, false);
// get search result data blobs of slices
search_result_data_blobs_ =
std::make_unique<milvus::segcore::SearchResultDataBlobs>();
Expand Down Expand Up @@ -131,6 +132,7 @@ ReduceHelper::FilterInvalidSearchResult(SearchResult* search_result) {

void
ReduceHelper::FillPrimaryKey() {
tracer::AutoSpan span("ReduceHelper::FillPrimaryKey", trace_ctx_, false);
// get primary keys for duplicates removal
uint32_t valid_index = 0;
for (auto& search_result : search_results_) {
Expand All @@ -153,6 +155,8 @@ ReduceHelper::FillPrimaryKey() {

void
ReduceHelper::RefreshSearchResult() {
tracer::AutoSpan span(
"ReduceHelper::RefreshSearchResult", trace_ctx_, false);
for (int i = 0; i < num_segments_; i++) {
std::vector<int64_t> real_topks(total_nq_, 0);
auto search_result = search_results_[i];
Expand Down Expand Up @@ -212,6 +216,7 @@ ReduceHelper::RefreshSearchResult() {

void
ReduceHelper::FillEntryData() {
tracer::AutoSpan span("ReduceHelper::FillEntryData", trace_ctx_, false);
for (auto search_result : search_results_) {
auto segment = static_cast<milvus::segcore::SegmentInterface*>(
search_result->segment_);
Expand Down Expand Up @@ -312,6 +317,7 @@ ReduceHelper::ReduceSearchResultForOneNQ(int64_t qi,

void
ReduceHelper::ReduceResultData() {
tracer::AutoSpan span("ReduceHelper::ReduceResultData", trace_ctx_, false);
for (int i = 0; i < num_segments_; i++) {
auto search_result = search_results_[i];
auto result_count = search_result->get_total_result_count();
Expand Down
9 changes: 7 additions & 2 deletions internal/core/src/segcore/Reduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/QueryResult.h"
#include "query/PlanImpl.h"
#include "ReduceStructure.h"
#include "common/Tracer.h"
#include "segment_c.h"

namespace milvus::segcore {
Expand All @@ -37,11 +38,13 @@ class ReduceHelper {
milvus::query::Plan* plan,
int64_t* slice_nqs,
int64_t* slice_topKs,
int64_t slice_num)
int64_t slice_num,
tracer::TraceContext* trace_ctx)
: search_results_(search_results),
plan_(plan),
slice_nqs_(slice_nqs, slice_nqs + slice_num),
slice_topKs_(slice_topKs, slice_topKs + slice_num) {
slice_topKs_(slice_topKs, slice_topKs + slice_num),
trace_ctx_(trace_ctx) {
Initialize();
}

Expand Down Expand Up @@ -109,6 +112,8 @@ class ReduceHelper {

// output
std::unique_ptr<SearchResultDataBlobs> search_result_data_blobs_;

tracer::TraceContext* trace_ctx_;
};

} // namespace milvus::segcore
18 changes: 13 additions & 5 deletions internal/core/src/segcore/SegmentInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@
SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
longjiquan marked this conversation as resolved.
Show resolved Hide resolved
Timestamp timestamp,
int64_t limit_size) const {
return Retrieve(plan, timestamp, limit_size, false);
return Retrieve(nullptr, plan, timestamp, limit_size, false);
}

std::unique_ptr<proto::segcore::RetrieveResults>
SegmentInternalInterface::Retrieve(const query::RetrievePlan* plan,
SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", trace_ctx, false);
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(*this, timestamp);
auto retrieve_results = visitor.get_retrieve_result(*plan->plan_node_);
Expand All @@ -118,7 +120,8 @@

results->mutable_offset()->Add(retrieve_results.result_offsets_.begin(),
retrieve_results.result_offsets_.end());
FillTargetEntry(plan,
FillTargetEntry(trace_ctx,
plan,
results,
retrieve_results.result_offsets_.data(),
retrieve_results.result_offsets_.size(),
Expand All @@ -130,12 +133,15 @@

void
SegmentInternalInterface::FillTargetEntry(
tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
const std::unique_ptr<proto::segcore::RetrieveResults>& results,
const int64_t* offsets,
int64_t size,
bool ignore_non_pk,
bool fill_ids) const {
tracer::AutoSpan span("FillTargetEntry", trace_ctx, false);

auto fields_data = results->mutable_fields_data();
auto ids = results->mutable_ids();
auto pk_field_id = plan->schema_.get_primary_field_id();
Expand Down Expand Up @@ -215,12 +221,14 @@
}

std::unique_ptr<proto::segcore::RetrieveResults>
SegmentInternalInterface::Retrieve(const query::RetrievePlan* Plan,
SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,

Check warning on line 224 in internal/core/src/segcore/SegmentInterface.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentInterface.cpp#L224

Added line #L224 was not covered by tests
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const {
std::shared_lock lck(mutex_);
tracer::AutoSpan span("RetrieveByOffsets", trace_ctx, false);

Check warning on line 229 in internal/core/src/segcore/SegmentInterface.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentInterface.cpp#L229

Added line #L229 was not covered by tests
auto results = std::make_unique<proto::segcore::RetrieveResults>();
FillTargetEntry(Plan, results, offsets, size, false, false);
FillTargetEntry(trace_ctx, Plan, results, offsets, size, false, false);

Check warning on line 231 in internal/core/src/segcore/SegmentInterface.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentInterface.cpp#L231

Added line #L231 was not covered by tests
return results;
}

Expand Down
13 changes: 9 additions & 4 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ class SegmentInterface {
int64_t limit_size) const = 0;

virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const = 0;

virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const = 0;

Expand Down Expand Up @@ -171,13 +173,15 @@ class SegmentInternalInterface : public SegmentInterface {
int64_t limit_size) const override;

std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
Timestamp timestamp,
int64_t limit_size,
bool ignore_non_pk) const override;

std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const override;

Expand Down Expand Up @@ -303,6 +307,7 @@ class SegmentInternalInterface : public SegmentInterface {

void
FillTargetEntry(
tracer::TraceContext* trace_ctx,
const query::RetrievePlan* plan,
const std::unique_ptr<proto::segcore::RetrieveResults>& results,
const int64_t* offsets,
Expand Down
15 changes: 12 additions & 3 deletions internal/core/src/segcore/reduce_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ GetStreamReduceResult(CSearchStreamReducer c_stream_reducer,
}

CStatus
ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
ReduceSearchResultsAndFillData(CTraceContext c_trace,
CSearchResultDataBlobs* cSearchResultDataBlobs,
CSearchPlan c_plan,
CSearchResult* c_search_results,
int64_t num_segments,
Expand All @@ -85,13 +86,21 @@ ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
// get SearchResult and SearchPlan
auto plan = static_cast<milvus::query::Plan*>(c_plan);
AssertInfo(num_segments > 0, "num_segments must be greater than 0");
auto trace_ctx = milvus::tracer::TraceContext{
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span(
"ReduceSearchResultsAndFillData", &trace_ctx, true);
std::vector<SearchResult*> search_results(num_segments);
for (int i = 0; i < num_segments; ++i) {
search_results[i] = static_cast<SearchResult*>(c_search_results[i]);
}

auto reduce_helper = milvus::segcore::ReduceHelper(
search_results, plan, slice_nqs, slice_topKs, num_slices);
auto reduce_helper = milvus::segcore::ReduceHelper(search_results,
plan,
slice_nqs,
slice_topKs,
num_slices,
&trace_ctx);
reduce_helper.Reduce();
reduce_helper.Marshal();

Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/reduce_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ GetStreamReduceResult(CSearchStreamReducer c_stream_reducer,
CSearchResultDataBlobs* c_search_result_data_blobs);

CStatus
ReduceSearchResultsAndFillData(CSearchResultDataBlobs* cSearchResultDataBlobs,
ReduceSearchResultsAndFillData(CTraceContext c_trace,
CSearchResultDataBlobs* cSearchResultDataBlobs,
CSearchPlan c_plan,
CSearchResult* search_results,
int64_t num_segments,
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@
c_trace.traceID, c_trace.spanID, c_trace.traceFlags};
milvus::tracer::AutoSpan span("SegCoreRetrieve", &trace_ctx, true);

auto retrieve_result =
segment->Retrieve(plan, timestamp, limit_size, ignore_non_pk);
auto retrieve_result = segment->Retrieve(
&trace_ctx, plan, timestamp, limit_size, ignore_non_pk);

auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);
Expand Down Expand Up @@ -174,7 +174,8 @@
milvus::tracer::AutoSpan span(
"SegCoreRetrieveByOffsets", &trace_ctx, true);

auto retrieve_result = segment->Retrieve(plan, offsets, len);
auto retrieve_result =
segment->Retrieve(&trace_ctx, plan, offsets, len);

Check warning on line 178 in internal/core/src/segcore/segment_c.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/segment_c.cpp#L177-L178

Added lines #L177 - L178 were not covered by tests

auto size = retrieve_result->ByteSizeLong();
std::unique_ptr<uint8_t[]> buffer(new uint8_t[size]);
Expand Down
18 changes: 12 additions & 6 deletions internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,8 @@ TEST(CApiTest, ReduceNullResult) {
ASSERT_EQ(status.error_code, Success);
results.push_back(res);
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down Expand Up @@ -1631,7 +1632,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
results.push_back(res2);

CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down Expand Up @@ -1667,7 +1669,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
results.push_back(res2);
results.push_back(res3);
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down Expand Up @@ -1810,7 +1813,8 @@ testReduceSearchWithExpr(int N,

// 1. reduce
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down Expand Up @@ -3504,7 +3508,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
auto slice_topKs = std::vector<int64_t>{topK};

CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down Expand Up @@ -3700,7 +3705,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
auto slice_topKs = std::vector<int64_t>{topK};

CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
plan,
results.data(),
results.size(),
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_group_by.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ TEST(GroupBY, Reduce) {
auto slice_nqs = std::vector<int64_t>{num_queries / 2, num_queries / 2};
auto slice_topKs = std::vector<int64_t>{topK / 2, topK};
CSearchResultDataBlobs cSearchResultData;
status = ReduceSearchResultsAndFillData(&cSearchResultData,
status = ReduceSearchResultsAndFillData({},
&cSearchResultData,
c_plan,
results.data(),
results.size(),
Expand Down
7 changes: 7 additions & 0 deletions internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ func (t *searchTask) reduceResults(ctx context.Context, toReduceResults []*inter
metricType = toReduceResults[0].GetMetricType()
}

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "reduceResults")
defer sp.End()

// Decode all search results
validSearchResults, err := decodeSearchResults(ctx, toReduceResults)
if err != nil {
Expand Down Expand Up @@ -838,6 +841,8 @@ func doRequery(ctx context.Context,
// 3 2 5 4 1 (result ids)
// v3 v2 v5 v4 v1 (result vectors)
// ===========================================
_, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "reorganizeRequeryResults")
defer sp.End()
pkFieldData, err := typeutil.GetPrimaryFieldData(queryResult.GetFieldsData(), pkField)
if err != nil {
return err
Expand Down Expand Up @@ -867,6 +872,8 @@ func doRequery(ctx context.Context,
}

func decodeSearchResults(ctx context.Context, searchResults []*internalpb.SearchResults) ([]*schemapb.SearchResultData, error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "decodeSearchResults")
defer sp.End()
tr := timerecord.NewTimeRecorder("decodeSearchResults")
results := make([]*schemapb.SearchResultData, 0)
for _, partialSearchResult := range searchResults {
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/segments/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func ReduceSearchResultsAndFillData(ctx context.Context, plan *SearchPlan, searc
cSliceTopKSPtr := (*C.int64_t)(&sliceTopKs[0])
cNumSlices := C.int64_t(len(sliceNQs))
var cSearchResultDataBlobs SearchResultDataBlobs
status := C.ReduceSearchResultsAndFillData(&cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr,
traceCtx := ParseCTraceContext(ctx)
status := C.ReduceSearchResultsAndFillData(traceCtx.ctx, &cSearchResultDataBlobs, plan.cSearchPlan, cSearchResultPtr,
cNumSegments, cSliceNQSPtr, cSliceTopKSPtr, cNumSlices)
if err := HandleCStatus(ctx, &status, "ReduceSearchResultsAndFillData failed"); err != nil {
return nil, err
Expand Down