Skip to content

Commit

Permalink
enhance: Query slot for compaction task (#32881)
Browse files Browse the repository at this point in the history
Query slot of compaction in datanode, and transfer the control logic for
limiting compaction tasks from datacoord to the datanode.

issue: #32809

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed May 17, 2024
1 parent 5c6de47 commit 3256026
Show file tree
Hide file tree
Showing 24 changed files with 642 additions and 193 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ dataNode:
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
slot:
slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.

# Configures the system log output.
log:
Expand Down
28 changes: 28 additions & 0 deletions internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"sync"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -31,6 +32,8 @@ import (
)

// Cluster provides interfaces to interact with datanode cluster
//
//go:generate mockery --name=Cluster --structname=MockCluster --output=./ --filename=mock_cluster.go --with-expecter --inpackage
type Cluster interface {
Startup(ctx context.Context, nodes []*NodeInfo) error
Register(node *NodeInfo) error
Expand All @@ -43,6 +46,7 @@ type Cluster interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
QuerySlots() map[int64]int64
GetSessions() []*Session
Close()
}
Expand Down Expand Up @@ -175,6 +179,30 @@ func (c *ClusterImpl) DropImport(nodeID int64, in *datapb.DropImportRequest) err
return c.sessionManager.DropImport(nodeID, in)
}

func (c *ClusterImpl) QuerySlots() map[int64]int64 {
nodeIDs := c.sessionManager.GetSessionIDs()
nodeSlots := make(map[int64]int64)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
for _, nodeID := range nodeIDs {
wg.Add(1)
go func(nodeID int64) {
defer wg.Done()
resp, err := c.sessionManager.QuerySlot(nodeID)
if err != nil {
log.Warn("query slot failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
mu.Lock()
defer mu.Unlock()
nodeSlots[nodeID] = resp.GetNumSlots()
}(nodeID)
}
wg.Wait()
log.Debug("query slot done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}

// GetSessions returns all sessions
func (c *ClusterImpl) GetSessions() []*Session {
return c.sessionManager.GetSessions()
Expand Down
28 changes: 28 additions & 0 deletions internal/datacoord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/testutils"
)

Expand Down Expand Up @@ -175,3 +177,29 @@ func (suite *ClusterSuite) TestFlushChannels() {
suite.NoError(err)
})
}

func (suite *ClusterSuite) TestQuerySlot() {
suite.Run("query slot failed", func() {
suite.SetupTest()
suite.mockSession.EXPECT().GetSessionIDs().Return([]int64{1}).Once()
suite.mockSession.EXPECT().QuerySlot(int64(1)).Return(nil, errors.New("mock err")).Once()
cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
nodeSlots := cluster.QuerySlots()
suite.Equal(0, len(nodeSlots))
})

suite.Run("normal", func() {
suite.SetupTest()
suite.mockSession.EXPECT().GetSessionIDs().Return([]int64{1, 2, 3, 4}).Once()
suite.mockSession.EXPECT().QuerySlot(int64(1)).Return(&datapb.QuerySlotResponse{NumSlots: 1}, nil).Once()
suite.mockSession.EXPECT().QuerySlot(int64(2)).Return(&datapb.QuerySlotResponse{NumSlots: 2}, nil).Once()
suite.mockSession.EXPECT().QuerySlot(int64(3)).Return(&datapb.QuerySlotResponse{NumSlots: 3}, nil).Once()
suite.mockSession.EXPECT().QuerySlot(int64(4)).Return(&datapb.QuerySlotResponse{NumSlots: 4}, nil).Once()
cluster := NewClusterImpl(suite.mockSession, suite.mockChManager)
nodeSlots := cluster.QuerySlots()
suite.Equal(int64(1), nodeSlots[1])
suite.Equal(int64(2), nodeSlots[2])
suite.Equal(int64(3), nodeSlots[3])
suite.Equal(int64(4), nodeSlots[4])
})
}
6 changes: 3 additions & 3 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ type compactionPlanHandler struct {
stopWg sync.WaitGroup
}

func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator,
func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm ChannelManager, meta CompactionMeta, allocator allocator,
) *compactionPlanHandler {
return &compactionPlanHandler{
plans: make(map[int64]*compactionTask),
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
scheduler: NewCompactionScheduler(),
scheduler: NewCompactionScheduler(cluster),
}
}

Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *compactionPlanHandler) start() {
// influence the schedule
go func() {
defer c.stopWg.Done()
scheduleTicker := time.NewTicker(200 * time.Millisecond)
scheduleTicker := time.NewTicker(2 * time.Second)
defer scheduleTicker.Stop()
log.Info("compaction handler start schedule")
for {
Expand Down
55 changes: 41 additions & 14 deletions internal/datacoord/compaction_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ type CompactionScheduler struct {
taskGuard lock.RWMutex

planHandler *compactionPlanHandler
cluster Cluster
}

var _ Scheduler = (*CompactionScheduler)(nil)

func NewCompactionScheduler() *CompactionScheduler {
func NewCompactionScheduler(cluster Cluster) *CompactionScheduler {
return &CompactionScheduler{
taskNumber: atomic.NewInt32(0),
queuingTasks: make([]*compactionTask, 0),
parallelTasks: make(map[int64][]*compactionTask),
cluster: cluster,
}
}

Expand All @@ -62,22 +64,27 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {

// Schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) Schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID

s.taskGuard.Lock()
defer s.taskGuard.Unlock()
for _, task := range s.queuingTasks {
if _, ok := nodeTasks[task.dataNodeID]; !ok {
nodeTasks[task.dataNodeID] = make([]*compactionTask, 0)
}

nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task)
nodeTasks := lo.GroupBy(s.queuingTasks, func(t *compactionTask) int64 {
return t.dataNodeID
})
s.taskGuard.Unlock()
if len(nodeTasks) == 0 {
return nil // To mitigate the need for frequent slot querying
}

nodeSlots := s.cluster.QuerySlots()

executable := make(map[int64]*compactionTask)

pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
// TODO: sheep, replace pickShardNode with pickAnyNode
if nodeID := s.pickShardNode(task.dataNodeID, nodeSlots); nodeID == NullNodeID {
log.Warn("cannot find datanode for compaction task", zap.Int64("planID", task.plan.PlanID), zap.String("vchannel", task.plan.Channel))
continue
}

if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
Expand All @@ -100,13 +107,11 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
return nil
}

s.taskGuard.Lock()
defer s.taskGuard.Unlock()
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
if len(parallel) >= calculateParallel() {
log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel)))
continue
}

var (
executing = typeutil.NewSet[string]()
Expand All @@ -122,6 +127,7 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
nodeSlots[node]--
}
}

Expand Down Expand Up @@ -211,3 +217,24 @@ func (s *CompactionScheduler) LogStatus() {
func (s *CompactionScheduler) GetTaskCount() int {
return int(s.taskNumber.Load())
}

func (s *CompactionScheduler) pickAnyNode(nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
)
for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
return nodeID
}

func (s *CompactionScheduler) pickShardNode(nodeID int64, nodeSlots map[int64]int64) int64 {
if nodeSlots[nodeID] > 0 {
return nodeID
}
return NullNodeID
}
52 changes: 48 additions & 4 deletions internal/datacoord/compaction_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type SchedulerSuite struct {
}

func (s *SchedulerSuite) SetupTest() {
s.scheduler = NewCompactionScheduler()
cluster := NewMockCluster(s.T())
s.scheduler = NewCompactionScheduler(cluster)
s.scheduler.parallelTasks = map[int64][]*compactionTask{
100: {
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch-1", Type: datapb.CompactionType_MixCompaction}},
Expand All @@ -39,7 +40,8 @@ func (s *SchedulerSuite) SetupTest() {
}

func (s *SchedulerSuite) TestScheduleEmpty() {
emptySch := NewCompactionScheduler()
cluster := NewMockCluster(s.T())
emptySch := NewCompactionScheduler(cluster)

tasks := emptySch.Schedule()
s.Empty(tasks)
Expand Down Expand Up @@ -72,6 +74,12 @@ func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.GetTaskCount())

if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{100: 0})
s.scheduler.cluster = cluster
}

// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
Expand Down Expand Up @@ -111,6 +119,12 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.GetTaskCount())

if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 2})
s.scheduler.cluster = cluster
}

// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
Expand All @@ -120,7 +134,12 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
return t.plan.PlanID
}))

// the second schedule returns empty for full paralleTasks
// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)

Expand Down Expand Up @@ -158,6 +177,12 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
s.SetupTest()
s.Require().Equal(4, s.scheduler.GetTaskCount())

if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{102: 2})
s.scheduler.cluster = cluster
}

// submit the testing tasks
s.scheduler.Submit(test.tasks...)
s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
Expand All @@ -167,7 +192,12 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
return t.plan.PlanID
}))

// the second schedule returns empty for full paralleTasks
// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
if len(gotTasks) > 0 {
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
Expand Down Expand Up @@ -215,3 +245,17 @@ func (s *SchedulerSuite) TestFinish() {
s.MetricsEqual(taskNum, 1)
})
}

func (s *SchedulerSuite) TestPickNode() {
s.Run("test pickAnyNode", func() {
nodeSlots := map[int64]int64{
100: 2,
101: 6,
}
node := s.scheduler.pickAnyNode(nodeSlots)
s.Equal(int64(101), node)

node = s.scheduler.pickAnyNode(map[int64]int64{})
s.Equal(int64(NullNodeID), node)
})
}

0 comments on commit 3256026

Please sign in to comment.