Skip to content

Commit

Permalink
enhance: Load BF from storage instead of memory during L0 compaction (#…
Browse files Browse the repository at this point in the history
…32913)

To decouple compaction from shard, loading BF from storage instead of
memory during L0 compaction in datanode.

issue: #32809

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed May 17, 2024
1 parent 0d0eda2 commit bcaacf6
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 84 deletions.
12 changes: 4 additions & 8 deletions internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,17 @@ func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *s
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))

// get pkfield id
pkField := int64(-1)
for _, field := range schema.Fields {
if field.IsPrimaryKey {
pkField = field.FieldID
break
}
pkField, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
return nil, err
}

// filter stats binlog files which is pk field stats log
bloomFilterFiles := []string{}
logType := storage.DefaultStatsType

for _, binlog := range statsBinlogs {
if binlog.FieldID != pkField {
if binlog.FieldID != pkField.GetFieldID() {
continue
}
Loop:
Expand Down
112 changes: 88 additions & 24 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/samber/lo"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand All @@ -54,6 +56,7 @@ type levelZeroCompactionTask struct {
allocator allocator.Allocator
metacache metacache.MetaCache
syncmgr syncmgr.SyncManager
cm storage.ChunkManager

plan *datapb.CompactionPlan

Expand All @@ -70,6 +73,7 @@ func newLevelZeroCompactionTask(
alloc allocator.Allocator,
metaCache metacache.MetaCache,
syncmgr syncmgr.SyncManager,
cm storage.ChunkManager,
plan *datapb.CompactionPlan,
) *levelZeroCompactionTask {
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -81,6 +85,7 @@ func newLevelZeroCompactionTask(
allocator: alloc,
metacache: metaCache,
syncmgr: syncmgr,
cm: cm,
plan: plan,
tr: timerecord.NewTimeRecorder("levelzero compaction"),
done: make(chan struct{}, 1),
Expand Down Expand Up @@ -129,13 +134,10 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return s.Level == datapb.SegmentLevel_L0
})

targetSegIDs := lo.FilterMap(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) (int64, bool) {
if s.Level == datapb.SegmentLevel_L1 {
return s.GetSegmentID(), true
}
return 0, false
targetSegments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level != datapb.SegmentLevel_L0
})
if len(targetSegIDs) == 0 {
if len(targetSegments) == 0 {
log.Warn("compact wrong, not target sealed segments")
return nil, errIllegalCompactionPlan
}
Expand Down Expand Up @@ -165,9 +167,9 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
var resultSegments []*datapb.CompactionSegment

if float64(hardware.GetFreeMemoryCount())*paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() < float64(totalSize) {
resultSegments, err = t.linearProcess(ctxTimeout, targetSegIDs, totalDeltalogs)
resultSegments, err = t.linearProcess(ctxTimeout, targetSegments, totalDeltalogs)
} else {
resultSegments, err = t.batchProcess(ctxTimeout, targetSegIDs, lo.Values(totalDeltalogs)...)
resultSegments, err = t.batchProcess(ctxTimeout, targetSegments, lo.Values(totalDeltalogs)...)
}
if err != nil {
return nil, err
Expand All @@ -188,65 +190,87 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return result, nil
}

func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []int64, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)

// just for logging
targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 {
return segment.GetSegmentID()
})

var (
resultSegments = make(map[int64]*datapb.CompactionSegment)
alteredSegments = make(map[int64]*storage.DeleteData)
)

segmentBFs, err := t.loadBF(targetSegments)
if err != nil {
return nil, err
}
for segID, deltaLogs := range totalDeltalogs {
log := log.With(zap.Int64("levelzero segment", segID))

log.Info("Linear L0 compaction start processing segment")
allIters, err := t.loadDelta(ctx, deltaLogs)
if err != nil {
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}

t.splitDelta(ctx, allIters, alteredSegments, targetSegments)
t.splitDelta(ctx, allIters, alteredSegments, segmentBFs)

err = t.uploadByCheck(ctx, true, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
}

err := t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegments), zap.Error(err))
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegmentIDs), zap.Error(err))
return nil, err
}
log.Info("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}

func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)

// just for logging
targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 {
return segment.GetSegmentID()
})

log.Info("Batch L0 compaction start processing")
resultSegments := make(map[int64]*datapb.CompactionSegment)

iters, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}

segmentBFs, err := t.loadBF(targetSegments)
if err != nil {
return nil, err
}

alteredSegments := make(map[int64]*storage.DeleteData)
t.splitDelta(ctx, iters, alteredSegments, targetSegments)
t.splitDelta(ctx, iters, alteredSegments, segmentBFs)

err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err))
return nil, err
}
log.Info("Batch L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
Expand All @@ -271,18 +295,20 @@ func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allIters []*iter.DeltalogIterator,
targetSegBuffer map[int64]*storage.DeleteData,
targetSegIDs []int64,
segmentBfs map[int64]*metacache.BloomFilterSet,
) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

// segments shall be safe to read outside
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(targetSegIDs...))
split := func(pk storage.PrimaryKey) []int64 {
lc := storage.NewLocationsCache(pk)
return lo.FilterMap(segments, func(segment *metacache.SegmentInfo, _ int) (int64, bool) {
return segment.SegmentID(), segment.GetBloomFilterSet().PkExists(lc)
})
predicts := make([]int64, 0, len(segmentBfs))
for segmentID, bf := range segmentBfs {
if bf.PkExists(lc) {
predicts = append(predicts, segmentID)
}
}
return predicts
}

// spilt all delete data to segments
Expand Down Expand Up @@ -395,3 +421,41 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec

return nil
}

func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
)

var (
futures = make([]*conc.Future[any], 0, len(targetSegments))
pool = getOrCreateStatsPool()

mu = &sync.Mutex{}
bfs = make(map[int64]*metacache.BloomFilterSet)
)

for _, segment := range targetSegments {
segment := segment
future := pool.Submit(func() (any, error) {
_ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(),
segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
pks, err := loadStats(t.ctx, t.cm,
t.metacache.Schema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return err, err
}
bf := metacache.NewBloomFilterSet(pks...)
mu.Lock()
defer mu.Unlock()
bfs[segment.GetSegmentID()] = bf
return nil, nil
})
futures = append(futures, future)
}

err := conc.AwaitAll(futures...)
return bfs, err
}

0 comments on commit bcaacf6

Please sign in to comment.