Skip to content

Commit

Permalink
enhance: Add metautil.Channel to convert string compare to int (#32749
Browse files Browse the repository at this point in the history
)

See also #32748

This PR:

- Add `metautil.Channel` utiltiy which convert virtual name to physical
channel name, collectionID and shard idx
- Add channel mapper interface & implementation to convert limited
physical channel name into int index
- Apply `metautil.Channel` filter in querynode segment manager logic

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed May 7, 2024
1 parent 6843d6d commit 40728ce
Show file tree
Hide file tree
Showing 20 changed files with 523 additions and 159 deletions.
44 changes: 33 additions & 11 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package delegator

import (
"context"
"fmt"
"path"
"strconv"
"testing"
Expand Down Expand Up @@ -61,6 +62,8 @@ type DelegatorDataSuite struct {
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
channel metautil.Channel
mapper metautil.ChannelMapper

delegator *shardDelegator
rootPath string
Expand All @@ -71,17 +74,22 @@ func (s *DelegatorDataSuite) SetupSuite() {
paramtable.Init()
paramtable.SetNodeID(1)
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1")

s.collectionID = 1000
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000v0"
s.version = 2000
var err error
s.mapper = metautil.NewDynChannelMapper()
s.channel, err = metautil.ParseChannel(s.vchannelName, s.mapper)
s.Require().NoError(err)
}

func (s *DelegatorDataSuite) TearDownSuite() {
paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key)
}

func (s *DelegatorDataSuite) SetupTest() {
s.collectionID = 1000
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000_v0"
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
Expand Down Expand Up @@ -290,9 +298,10 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
defer cancel()
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
{
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
}, 0)
s.Require().NoError(err)
Expand All @@ -308,6 +317,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand All @@ -334,6 +344,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 5000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 5000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -390,6 +401,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
Version: 1,
Expand Down Expand Up @@ -424,6 +436,7 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
Version: 2,
Expand Down Expand Up @@ -482,6 +495,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -563,6 +577,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Deltalogs: []*datapb.FieldBinlog{},
Level: datapb.SegmentLevel_L0,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand All @@ -578,6 +593,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -724,6 +740,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 2},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 2},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand All @@ -750,6 +767,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -788,6 +806,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -832,6 +851,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -896,9 +916,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
defer cancel()
err := s.delegator.LoadGrowing(ctx, []*querypb.SegmentLoadInfo{
{
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
SegmentID: 1001,
CollectionID: s.collectionID,
PartitionID: 500,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
}, 0)
s.Require().NoError(err)
Expand All @@ -914,6 +935,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
Expand Down Expand Up @@ -1066,7 +1088,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
ms.EXPECT().Partition().Return(1)
ms.EXPECT().InsertCount().Return(0)
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().Shard().Return(s.vchannelName)
ms.EXPECT().Shard().Return(s.channel)
ms.EXPECT().Level().Return(datapb.SegmentLevel_L1)
s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms)
}
Expand Down
8 changes: 5 additions & 3 deletions internal/querynodev2/local_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package querynodev2

import (
"context"
"fmt"
"testing"

"github.com/samber/lo"
Expand Down Expand Up @@ -121,9 +122,10 @@ func (suite *LocalWorkerTestSuite) TestLoadSegment() {
CollectionID: suite.collectionID,
Infos: lo.Map(suite.segmentIDs, func(segID int64, _ int) *querypb.SegmentLoadInfo {
return &querypb.SegmentLoadInfo{
CollectionID: suite.collectionID,
PartitionID: suite.partitionIDs[segID%2],
SegmentID: segID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionIDs[segID%2],
SegmentID: segID,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
}
}),
Schema: schema,
Expand Down
12 changes: 11 additions & 1 deletion internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/cache"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// TODO maybe move to manager and change segment constructor
var channelMapper = metautil.NewDynChannelMapper()

// SegmentFilter is the interface for segment selection criteria.
type SegmentFilter interface {
Filter(segment Segment) bool
Expand Down Expand Up @@ -109,8 +113,14 @@ func WithPartition(partitionID typeutil.UniqueID) SegmentFilter {
}

func WithChannel(channel string) SegmentFilter {
ac, err := metautil.ParseChannel(channel, channelMapper)
if err != nil {
return SegmentFilterFunc(func(segment Segment) bool {
return false
})
}
return SegmentFilterFunc(func(segment Segment) bool {
return segment.Shard() == channel
return segment.Shard().Equal(ac)
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *ManagerSuite) SetupSuite() {
s.segmentIDs = []int64{1, 2, 3, 4}
s.collectionIDs = []int64{100, 200, 300, 400}
s.partitionIDs = []int64{10, 11, 12, 13}
s.channels = []string{"dml1", "dml2", "dml3", "dml4"}
s.channels = []string{"by-dev-rootcoord-dml_0_100v0", "by-dev-rootcoord-dml_1_200v0", "by-dev-rootcoord-dml_2_300v0", "by-dev-rootcoord-dml_3_400v0"}
s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed}
s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/mock_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ func genSearchPlanAndRequests(collection *Collection, segments []int64, indexTyp
iReq, _ := genSearchRequest(nq, indexType, collection)
queryReq := &querypb.SearchRequest{
Req: iReq,
DmlChannels: []string{"dml"},
DmlChannels: []string{fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collection.ID())},
SegmentIDs: segments,
Scope: querypb.DataScope_Historical,
}
Expand Down Expand Up @@ -1448,7 +1448,7 @@ func genInsertMsg(collection *Collection, partitionID, segment int64, numRows in
CollectionID: collection.ID(),
PartitionID: partitionID,
SegmentID: segment,
ShardName: "dml",
ShardName: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collection.ID()),
Timestamps: genSimpleTimestampFieldData(numRows),
RowIDs: genSimpleRowIDField(numRows),
FieldsData: fieldsData,
Expand Down
14 changes: 8 additions & 6 deletions internal/querynodev2/segments/mock_segment.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/querynodev2/segments/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package segments

import (
"context"
"fmt"
"log"
"math"
"testing"
Expand Down Expand Up @@ -82,8 +83,8 @@ func (suite *ReduceSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
Expand Down
5 changes: 3 additions & 2 deletions internal/querynodev2/segments/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package segments

import (
"context"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -91,8 +92,8 @@ func (suite *RetrieveSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
Expand Down Expand Up @@ -120,7 +121,7 @@ func (suite *RetrieveSuite) SetupTest() {
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
Expand Down
5 changes: 3 additions & 2 deletions internal/querynodev2/segments/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package segments

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -82,8 +83,8 @@ func (suite *SearchSuite) SetupTest() {
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
NumOfRows: int64(msgLength),
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (suite *SearchSuite) SetupTest() {
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID),
Level: datapb.SegmentLevel_Legacy,
},
)
Expand Down

0 comments on commit 40728ce

Please sign in to comment.