Skip to content

Commit

Permalink
fix: add request resource timeout for lazy load, refactor context usa…
Browse files Browse the repository at this point in the history
…ge in cache (#32709)

issue: #32663

- Use new param to control request resource timeout for lazy load.

- Remove the timeout parameter of `Do`, remove `DoWait`. use `context`
to control the timeout.

- Use `VersionedNotifier` to avoid notify event lost and broadcast,
remove the redundant goroutine in cache.

related dev pr: #32684

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed May 7, 2024
1 parent b1eacb2 commit 641f702
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 179 deletions.
10 changes: 8 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ minio:
region: # Specify minio storage system location region
useVirtualHost: false # Whether use virtual host mode for bucket
requestTimeoutMs: 10000 # minio timeout for request time in milliseconds
listObjectsMaxKeys: 0 # The maximum number of objects requested per batch in minio ListObjects rpc, 0 means using oss client by default, decrease these configration if ListObjects timeout
# The maximum number of objects requested per batch in minio ListObjects rpc,
# 0 means using oss client by default, decrease these configration if ListObjects timeout
listObjectsMaxKeys: 0

# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
Expand Down Expand Up @@ -214,6 +216,7 @@ proxy:
ginLogging: true
ginLogSkipPaths: / # skip url path for gin log
maxTaskNum: 1024 # max task number of proxy task queue
mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection
accessLog:
enable: false # if use access log
minioEnable: false # if upload sealed access log file to minio
Expand Down Expand Up @@ -335,6 +338,9 @@ queryNode:
mmapEnabled: false # Enable mmap for loading data
mmapEnabled: false # Enable mmap for loading data
lazyloadEnabled: false # Enable lazyload for loading data
lazyloadWaitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve
lazyLoadRequestResourceTimeout: 5000 # max timeout in milliseconds for waiting request resource for lazy load, 5s by default
lazyLoadRequestResourceRetryInterval: 2000 # retry interval in milliseconds for waiting request resource for lazy load, 2s by default
grouping:
enabled: true
maxNQ: 1000
Expand Down Expand Up @@ -367,7 +373,6 @@ queryNode:
maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
enableSegmentPrune: false # use partition prune function on shard delegator
useStreamComputing: false
ip: # if not specified, use the first unicastable address
port: 21123
grpc:
Expand Down Expand Up @@ -717,6 +722,7 @@ quotaAndLimits:
limits:
maxCollectionNum: 65536
maxCollectionNumPerDB: 65536
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
limitWriting:
# forceDeny false means dml requests are allowed (except for some
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type Manager struct {
}

func NewManager() *Manager {
diskCap := paramtable.Get().QueryNodeCfg.DiskCacheCapacityLimit.GetAsInt64()
diskCap := paramtable.Get().QueryNodeCfg.DiskCacheCapacityLimit.GetAsSize()

segMgr := NewSegmentManager()
sf := singleflight.Group{}
Expand Down
14 changes: 7 additions & 7 deletions internal/querynodev2/segments/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -86,16 +85,17 @@ func retrieveOnSegments(ctx context.Context, mgr *Manager, segments []Segment, s
}()

if seg.IsLazyLoad() {
var timeout time.Duration
timeout, err = lazyloadWaitTimeout(ctx)
if err != nil {
return err
}
ctx, cancel := withLazyLoadTimeoutContext(ctx)
defer cancel()

var missing bool
missing, err = mgr.DiskCache.DoWait(ctx, seg.ID(), timeout, retriever)
missing, err = mgr.DiskCache.Do(ctx, seg.ID(), retriever)
if missing {
accessRecord.CacheMissing()
}
if err != nil {
log.Warn("failed to do query disk cache", zap.Int64("segID", seg.ID()), zap.Error(err))
}
return err
}
return retriever(ctx, seg)
Expand Down
42 changes: 12 additions & 30 deletions internal/querynodev2/segments/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
Expand Down Expand Up @@ -82,16 +79,17 @@ func searchSegments(ctx context.Context, mgr *Manager, segments []Segment, segTy
}()

if seg.IsLazyLoad() {
var timeout time.Duration
timeout, err = lazyloadWaitTimeout(ctx)
if err != nil {
return err
}
ctx, cancel := withLazyLoadTimeoutContext(ctx)
defer cancel()

var missing bool
missing, err = mgr.DiskCache.DoWait(ctx, seg.ID(), timeout, searcher)
missing, err = mgr.DiskCache.Do(ctx, seg.ID(), searcher)
if missing {
accessRecord.CacheMissing()
}
if err != nil {
log.Warn("failed to do search for disk cache", zap.Int64("segID", seg.ID()), zap.Error(err))
}
return err
}
return searcher(ctx, seg)
Expand Down Expand Up @@ -171,18 +169,16 @@ func searchSegmentsStreamly(ctx context.Context,
}()
if seg.IsLazyLoad() {
log.Debug("before doing stream search in DiskCache", zap.Int64("segID", seg.ID()))
var timeout time.Duration
timeout, err = lazyloadWaitTimeout(ctx)
if err != nil {
return err
}
ctx, cancel := withLazyLoadTimeoutContext(ctx)
defer cancel()

var missing bool
missing, err = mgr.DiskCache.DoWait(ctx, seg.ID(), timeout, searcher)
missing, err = mgr.DiskCache.Do(ctx, seg.ID(), searcher)
if missing {
accessRecord.CacheMissing()
}
if err != nil {
log.Error("failed to do search for disk cache", zap.Int64("seg_id", seg.ID()), zap.Error(err))
log.Warn("failed to do search for disk cache", zap.Int64("segID", seg.ID()), zap.Error(err))
}
log.Debug("after doing stream search in DiskCache", zap.Int64("segID", seg.ID()), zap.Error(err))
return err
Expand All @@ -203,20 +199,6 @@ func searchSegmentsStreamly(ctx context.Context,
return nil
}

func lazyloadWaitTimeout(ctx context.Context) (time.Duration, error) {
timeout := params.Params.QueryNodeCfg.LazyLoadWaitTimeout.GetAsDuration(time.Millisecond)
deadline, ok := ctx.Deadline()
if ok {
remain := time.Until(deadline)
if remain <= 0 {
return -1, merr.WrapErrServiceInternal("search context deadline exceeded")
} else if remain < timeout {
timeout = remain
}
}
return timeout, nil
}

// search will search on the historical segments the target segments in historical.
// if segIDs is not specified, it will search on all the historical segments speficied by partIDs.
// if segIDs is specified, it will only search on the segments specified by the segIDs.
Expand Down
31 changes: 26 additions & 5 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -1119,17 +1120,37 @@ func (loader *segmentLoader) LoadLazySegment(ctx context.Context,
segment *LocalSegment,
loadInfo *querypb.SegmentLoadInfo,
) (err error) {
infos := []*querypb.SegmentLoadInfo{loadInfo}
resource, _, err := loader.requestResource(ctx, infos...)
log := log.Ctx(ctx)
resource, _, err := loader.requestResourceWithTimeout(ctx, loadInfo)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return merr.ErrServiceResourceInsufficient
log.Ctx(ctx).Warn("request resource failed", zap.Error(err))
return err
}
defer loader.freeRequest(resource)

return loader.LoadSegment(ctx, segment, loadInfo)
}

// requestResourceWithTimeout requests memory & storage to load segments with a timeout and retry.
func (loader *segmentLoader) requestResourceWithTimeout(ctx context.Context, infos ...*querypb.SegmentLoadInfo) (LoadResource, int, error) {
timeout := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceTimeout.GetAsDuration(time.Millisecond)
retryInterval := paramtable.Get().QueryNodeCfg.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond)

// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
ctx, cancel := contextutil.WithTimeoutCause(ctx, timeout, merr.ErrServiceResourceInsufficient)
defer cancel()
for {
resource, concurrencyLevel, err := loader.requestResource(ctx, infos...)
if err == nil {
return resource, concurrencyLevel, nil
}
select {
case <-ctx.Done():
return LoadResource{}, -1, context.Cause(ctx)
case <-time.After(retryInterval):
}
}
}

func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBinlog, pkFieldID int64) ([]string, storage.StatsLogType) {
result := make([]string, 0)
for _, fieldBinlog := range fieldBinlogs {
Expand Down
14 changes: 14 additions & 0 deletions internal/querynodev2/segments/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import "C"

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"strconv"
"time"

"go.uber.org/zap"

Expand All @@ -27,9 +29,14 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var errLazyLoadTimeout = merr.WrapErrServiceInternal("lazy load time out")

func GetPkField(schema *schemapb.CollectionSchema) *schemapb.FieldSchema {
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
Expand Down Expand Up @@ -192,3 +199,10 @@ func FilterZeroValuesFromSlice(intVals []int64) []int64 {
}
return result
}

// withLazyLoadTimeoutContext returns a new context with lazy load timeout.
func withLazyLoadTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) {
lazyLoadTimeout := paramtable.Get().QueryNodeCfg.LazyLoadWaitTimeout.GetAsDuration(time.Millisecond)
// TODO: use context.WithTimeoutCause instead of contextutil.WithTimeoutCause in go1.21
return contextutil.WithTimeoutCause(ctx, lazyLoadTimeout, errLazyLoadTimeout)
}

0 comments on commit 641f702

Please sign in to comment.