Skip to content

Commit

Permalink
enhance: expose DescribeDatabase api in proxy (milvus-io#32732)
Browse files Browse the repository at this point in the history
issue: milvus-io#32707

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored and David Pichler committed May 10, 2024
1 parent a63c2f3 commit 37c09b1
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 102 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2 h1:jgXBS8x8DTriF2pEI0RH/A+eJ8NI1f51iJcdiYEZOBg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.2/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
Expand Down
4 changes: 4 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,3 +1233,7 @@ func (s *Server) AlterDatabase(ctx context.Context, req *milvuspb.AlterDatabaseR
func (s *Server) InvalidateShardLeaderCache(ctx context.Context, req *proxypb.InvalidateShardLeaderCacheRequest) (*commonpb.Status, error) {
return s.proxy.InvalidateShardLeaderCache(ctx, req)
}

func (s *Server) DescribeDatabase(ctx context.Context, req *milvuspb.DescribeDatabaseRequest) (*milvuspb.DescribeDatabaseResponse, error) {
return s.proxy.DescribeDatabase(ctx, req)
}
6 changes: 6 additions & 0 deletions internal/distributed/proxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ func Test_NewServer(t *testing.T) {
assert.Nil(t, err)
})

t.Run("DescribeDatabase", func(t *testing.T) {
mockProxy.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(nil, nil)
_, err := server.DescribeDatabase(ctx, nil)
assert.Nil(t, err)
})

t.Run("AllocTimestamp", func(t *testing.T) {
mockProxy.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, nil)
_, err := server.AllocTimestamp(ctx, nil)
Expand Down
55 changes: 55 additions & 0 deletions internal/mocks/mock_proxy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/proto/root_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ message DescribeDatabaseResponse {
string db_name = 2;
int64 dbID = 3;
uint64 created_timestamp = 4;
repeated common.KeyValuePair properties = 5;
}

message AlterDatabaseRequest {
Expand Down
60 changes: 60 additions & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,66 @@ func (node *Proxy) AlterDatabase(ctx context.Context, request *milvuspb.AlterDat
return act.result, nil
}

func (node *Proxy) DescribeDatabase(ctx context.Context, request *milvuspb.DescribeDatabaseRequest) (*milvuspb.DescribeDatabaseResponse, error) {
resp := &milvuspb.DescribeDatabaseResponse{}
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
resp.Status = merr.Status(err)
return resp, nil
}

ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeDatabase")
defer sp.End()
method := "DescribeDatabase"
tr := timerecord.NewTimeRecorder(method)

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), "").Inc()

act := &describeDatabaseTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeDatabaseRequest: request,
rootCoord: node.rootCoord,
}

log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName))

log.Debug(rpcReceived(method))

if err := node.sched.ddQueue.Enqueue(act); err != nil {
log.Warn(rpcFailedToEnqueue(method), zap.Error(err))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), "").Inc()
resp.Status = merr.Status(err)
return resp, nil
}

log.Debug(rpcEnqueued(method),
zap.Uint64("BeginTs", act.BeginTs()),
zap.Uint64("EndTs", act.EndTs()),
zap.Uint64("timestamp", request.Base.Timestamp))

if err := act.WaitToFinish(); err != nil {
log.Warn(rpcFailedToWaitToFinish(method),
zap.Error(err),
zap.Uint64("BeginTs", act.BeginTs()),
zap.Uint64("EndTs", act.EndTs()))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), "").Inc()
resp.Status = merr.Status(err)
return resp, nil
}

log.Debug(rpcDone(method),
zap.Uint64("BeginTs", act.BeginTs()),
zap.Uint64("EndTs", act.EndTs()))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), "").Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return act.result, nil
}

// CreateCollection create a collection by the schema.
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
Expand Down
52 changes: 52 additions & 0 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,58 @@ func TestProxyAlterDatabase(t *testing.T) {
})
}

func TestProxyDescribeDatabase(t *testing.T) {
paramtable.Init()

t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.DescribeDatabase(ctx, &milvuspb.DescribeDatabaseRequest{})
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})

factory := dependency.NewDefaultFactory(true)
ctx := context.Background()

node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
assert.NoError(t, err)
err = node.sched.Start()
assert.NoError(t, err)
defer node.sched.Close()

t.Run("describe database fail", func(t *testing.T) {
rc := mocks.NewMockRootCoordClient(t)
rc.On("DescribeDatabase", mock.Anything, mock.Anything).Return(nil, errors.New("fail"))
node.rootCoord = rc
ctx := context.Background()
resp, err := node.DescribeDatabase(ctx, &milvuspb.DescribeDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
})

t.Run("describe database ok", func(t *testing.T) {
rc := mocks.NewMockRootCoordClient(t)
rc.On("DescribeDatabase", mock.Anything, mock.Anything).Return(&rootcoordpb.DescribeDatabaseResponse{Status: merr.Success()}, nil)
node.rootCoord = rc
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()

resp, err := node.DescribeDatabase(ctx, &milvuspb.DescribeDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
}

func TestProxy_AllocTimestamp(t *testing.T) {
t.Run("proxy unhealthy", func(t *testing.T) {
node := &Proxy{}
Expand Down
82 changes: 5 additions & 77 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -88,10 +87,11 @@ const (
ListResourceGroupsTaskName = "ListResourceGroupsTask"
DescribeResourceGroupTaskName = "DescribeResourceGroupTask"

CreateDatabaseTaskName = "CreateCollectionTask"
DropDatabaseTaskName = "DropDatabaseTaskName"
ListDatabaseTaskName = "ListDatabaseTaskName"
AlterDatabaseTaskName = "AlterDatabaseTaskName"
CreateDatabaseTaskName = "CreateCollectionTask"
DropDatabaseTaskName = "DropDatabaseTaskName"
ListDatabaseTaskName = "ListDatabaseTaskName"
AlterDatabaseTaskName = "AlterDatabaseTaskName"
DescribeDatabaseTaskName = "DescribeDatabaseTaskName"

// minFloat32 minimum float.
minFloat32 = -1 * float32(math.MaxFloat32)
Expand Down Expand Up @@ -2512,75 +2512,3 @@ func (t *ListResourceGroupsTask) Execute(ctx context.Context) error {
func (t *ListResourceGroupsTask) PostExecute(ctx context.Context) error {
return nil
}

type alterDatabaseTask struct {
baseTask
Condition
*milvuspb.AlterDatabaseRequest
ctx context.Context
rootCoord types.RootCoordClient
result *commonpb.Status
}

func (t *alterDatabaseTask) TraceCtx() context.Context {
return t.ctx
}

func (t *alterDatabaseTask) ID() UniqueID {
return t.Base.MsgID
}

func (t *alterDatabaseTask) SetID(uid UniqueID) {
t.Base.MsgID = uid
}

func (t *alterDatabaseTask) Name() string {
return AlterDatabaseTaskName
}

func (t *alterDatabaseTask) Type() commonpb.MsgType {
return t.Base.MsgType
}

func (t *alterDatabaseTask) BeginTs() Timestamp {
return t.Base.Timestamp
}

func (t *alterDatabaseTask) EndTs() Timestamp {
return t.Base.Timestamp
}

func (t *alterDatabaseTask) SetTs(ts Timestamp) {
t.Base.Timestamp = ts
}

func (t *alterDatabaseTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
return nil
}

func (t *alterDatabaseTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_AlterCollection
t.Base.SourceID = paramtable.GetNodeID()

return nil
}

func (t *alterDatabaseTask) Execute(ctx context.Context) error {
var err error

req := &rootcoordpb.AlterDatabaseRequest{
Base: t.AlterDatabaseRequest.GetBase(),
DbName: t.AlterDatabaseRequest.GetDbName(),
DbId: t.AlterDatabaseRequest.GetDbId(),
Properties: t.AlterDatabaseRequest.GetProperties(),
}
t.result, err = t.rootCoord.AlterDatabase(ctx, req)
return err
}

func (t *alterDatabaseTask) PostExecute(ctx context.Context) error {
return nil
}

0 comments on commit 37c09b1

Please sign in to comment.