Skip to content

Commit

Permalink
heal: Persist MRF queue in the disk during shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Anis Eleuch committed Apr 4, 2024
1 parent 2228eb6 commit f65e284
Show file tree
Hide file tree
Showing 13 changed files with 654 additions and 82 deletions.
2 changes: 1 addition & 1 deletion cmd/background-newdisks-heal-ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}

// Remove .healing.bin from all disks with similar heal-id
disks, err := z.GetDisks(poolIdx, setIdx)
disks, err := z.GetDisks(poolIdx, setIdx, false)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/erasure-healing-common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestListOnlineDisks(t *testing.T) {
data := bytes.Repeat([]byte("a"), smallFileThreshold*16)
z := obj.(*erasureServerPools)

erasureDisks, err := z.GetDisks(0, 0)
erasureDisks, err := z.GetDisks(0, 0, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
data := bytes.Repeat([]byte("a"), smallFileThreshold/2)
z := obj.(*erasureServerPools)

erasureDisks, err := z.GetDisks(0, 0)
erasureDisks, err := z.GetDisks(0, 0, false)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/erasure-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,13 +1308,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}

if !opts.Speedtest && versionsDisparity {
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
queued: time.Now(),
allVersions: true,
setIndex: er.setIndex,
poolIndex: er.poolIndex,
globalMRFState.addPartialOp(PartialOperation{
Bucket: bucket,
Object: object,
Queued: time.Now(),
AllVersions: true,
SetIndex: er.setIndex,
PoolIndex: er.poolIndex,
})
}

Expand Down
64 changes: 28 additions & 36 deletions cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,24 +383,16 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
// that we have some parts or data blocks missing or corrupted
// - attempt a heal to successfully heal them for future calls.
if written == partLength {
var scan madmin.HealScanMode
switch {
case errors.Is(err, errFileNotFound):
scan = madmin.HealNormalScan
case errors.Is(err, errFileCorrupt):
scan = madmin.HealDeepScan
}
switch scan {
case madmin.HealNormalScan, madmin.HealDeepScan:
if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) {
healOnce.Do(func() {
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
versionID: fi.VersionID,
queued: time.Now(),
setIndex: er.setIndex,
poolIndex: er.poolIndex,
scanMode: scan,
globalMRFState.addPartialOp(PartialOperation{
Bucket: bucket,
Object: object,
VersionID: fi.VersionID,
Queued: time.Now(),
SetIndex: er.setIndex,
PoolIndex: er.poolIndex,
BitrotScan: errors.Is(err, errFileCorrupt),
})
})
// Healing is triggered and we have written
Expand Down Expand Up @@ -827,13 +819,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// additionally do not heal delete markers inline, let them be
// healed upon regular heal process.
if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks {
globalMRFState.addPartialOp(partialOperation{
bucket: fi.Volume,
object: fi.Name,
versionID: fi.VersionID,
queued: time.Now(),
setIndex: er.setIndex,
poolIndex: er.poolIndex,
globalMRFState.addPartialOp(PartialOperation{
Bucket: fi.Volume,
Object: fi.Name,
VersionID: fi.VersionID,
Queued: time.Now(),
SetIndex: er.setIndex,
PoolIndex: er.poolIndex,
})
}

Expand Down Expand Up @@ -1579,13 +1571,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
break
}
} else {
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
queued: time.Now(),
allVersions: true,
setIndex: er.setIndex,
poolIndex: er.poolIndex,
globalMRFState.addPartialOp(PartialOperation{
Bucket: bucket,
Object: object,
Queued: time.Now(),
AllVersions: true,
SetIndex: er.setIndex,
PoolIndex: er.poolIndex,
})
}
}
Expand Down Expand Up @@ -2076,11 +2068,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
// Send the successful but partial upload/delete, however ignore
// if the channel is blocked by other items.
func (er erasureObjects) addPartial(bucket, object, versionID string) {
globalMRFState.addPartialOp(partialOperation{
bucket: bucket,
object: object,
versionID: versionID,
queued: time.Now(),
globalMRFState.addPartialOp(PartialOperation{
Bucket: bucket,
Object: object,
VersionID: versionID,
Queued: time.Now(),
})
}

Expand Down
28 changes: 24 additions & 4 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
})

bootstrapTrace("initHealMRF", func() {
go globalMRFState.start()
go globalMRFState.healRoutine(z)
})

Expand Down Expand Up @@ -294,11 +295,30 @@ func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string
}

// Return the disks belonging to the poolIdx, and setIdx.
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int) ([]StorageAPI, error) {
if poolIdx < len(z.serverPools) && setIdx < len(z.serverPools[poolIdx].sets) {
return z.serverPools[poolIdx].sets[setIdx].getDisks(), nil
func (z *erasureServerPools) GetDisks(poolIdx, setIdx int, local bool) (disks []StorageAPI, err error) {
if poolIdx >= 0 {
if poolIdx > len(z.serverPools) || setIdx > len(z.serverPools[poolIdx].sets) {
return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
}
disks = z.serverPools[poolIdx].sets[setIdx].getDisks()
} else {
// poolIdx < 0 means collect all disks in this cluster
for pool := range z.serverPools {
for set := range z.serverPools[pool].sets {
disks = append(disks, z.serverPools[pool].sets[set].getDisks()...)
}
}
}
return nil, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))

if local {
for d := range disks {
if disks[d] != nil && !disks[d].IsLocal() {
disks[d] = nil
}
}
}

return disks, nil
}

// Return the count of disks in each pool
Expand Down
2 changes: 1 addition & 1 deletion cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ var (
globalBackgroundHealState = newHealState(GlobalContext, false)

globalMRFState = mrfState{
opCh: make(chan partialOperation, mrfOpsQueueSize),
opCh: make(chan PartialOperation, mrfOpsQueueSize),
}

// If writes to FS backend should be O_SYNC.
Expand Down

0 comments on commit f65e284

Please sign in to comment.