Skip to content

Commit

Permalink
fix: since DataDir is unique, avoid long locks
Browse files Browse the repository at this point in the history
getObjectNInfo() current holds the lock until the
client has entirely read the full content from
the disk, this is not needed because the dataDir
that is present on disk is unique and could never
be overwritten concurrently by another caller.

It is sufficient to verify xl.meta and its quorum
validity under quorum, once that is achieved we
do not ever have to hold the lock further.

Currently clients speed would dictate the time
the lock is held for READ operation on MinIO,
this is not needed and we can make this lock-free
, allowing mutations to serialize on each other
but not READs.

This allows for applications like fixes minio#16314
  • Loading branch information
harshavardhana committed Apr 17, 2023
1 parent 18515a4 commit 309308b
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 134 deletions.
2 changes: 1 addition & 1 deletion cmd/batch-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
Versioned: versioned,
VersionSuspended: versionSuspended,
}
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, readLock, opts)
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, opts)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)

gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)

gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
Expand Down
7 changes: 1 addition & 6 deletions cmd/config-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ import (
var errConfigNotFound = errors.New("config file not found")

func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string, opts ObjectOptions) ([]byte, ObjectInfo, error) {
lockType := readLock
if opts.NoLock {
lockType = noLock // erasureObjects.GetObjectNInfo honors lockType argument but not opts.NoLock.
}

r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, lockType, opts)
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, opts)
if err != nil {
if isErrObjectNotFound(err) {
return nil, ObjectInfo{}, errConfigNotFound
Expand Down
4 changes: 2 additions & 2 deletions cmd/data-usage-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ func (d *dataUsageCache) merge(other dataUsageCache) {
}

type objectIO interface {
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (reader *GetObjectReader, err error)
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (reader *GetObjectReader, err error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
}

Expand All @@ -889,7 +889,7 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
// Caches are read+written without locks,
retries := 0
for retries < 5 {
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{NoLock: true})
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
if err != nil {
switch err.(type) {
case ObjectNotFound, BucketNotFound:
Expand Down
30 changes: 15 additions & 15 deletions cmd/disk-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type CacheStorageInfo struct {
// CacheObjectLayer implements primitives for cache object API layer.
type CacheObjectLayer interface {
// Object operations.
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error)
Expand Down Expand Up @@ -117,7 +117,7 @@ type cacheObjects struct {
// Cache stats
cacheStats *CacheStats

InnerGetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
InnerGetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error)
InnerGetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
InnerDeleteObjectFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
InnerPutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
Expand Down Expand Up @@ -231,16 +231,16 @@ func (c *cacheObjects) incCacheStats(size int64) {
c.cacheStats.incBytesServed(size)
}

func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
if c.isCacheExclude(bucket, object) || c.skipCache() {
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
}
var cc *cacheControl
var cacheObjSize int64
// fetch diskCache if object is currently cached or nearest available cache drive
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
}

cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts)
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
if cc != nil && cc.noStore {
cacheReader.Close()
c.cacheStats.incMiss()
bReader, err := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
bReader, err := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
bReader.ObjInfo.CacheLookupStatus = CacheHit
bReader.ObjInfo.CacheStatus = CacheMiss
return bReader, err
Expand Down Expand Up @@ -304,7 +304,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
cacheReader.Close()
}
c.cacheStats.incMiss()
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
}
// skip cache for objects with locks
objRetention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
Expand All @@ -314,7 +314,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
cacheReader.Close()
}
c.cacheStats.incMiss()
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
}
if cacheErr == nil {
// if ETag matches for stale cache entry, serve from cache
Expand All @@ -332,7 +332,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// Reaching here implies cache miss
c.cacheStats.incMiss()

bkReader, bkErr := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
bkReader, bkErr := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)

if bkErr != nil {
return bkReader, bkErr
Expand All @@ -359,7 +359,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// if range caching is disabled, download entire object.
rs = nil
// fill cache in the background for range GET requests
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, rs, h, lockType, opts)
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, rs, h, opts)
if bErr != nil {
return
}
Expand Down Expand Up @@ -713,7 +713,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
if err == nil {
go func() {
// fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, ObjectOptions{})
if bErr != nil {
return
}
Expand Down Expand Up @@ -857,8 +857,8 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
},
InnerGetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
InnerGetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, opts)
},
InnerDeleteObjectFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().DeleteObject(ctx, bucket, object, opts)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
End: startOffset + length,
}
// fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, srcBucket, srcObject, rs, http.Header{}, readLock, ObjectOptions{})
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, srcBucket, srcObject, rs, http.Header{}, ObjectOptions{})
if bErr != nil {
return
}
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
_, err := dcache.CompleteMultipartUpload(bgContext(ctx), bucket, object, uploadID, uploadedParts, oi, opts)
if err != nil {
// fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, ObjectOptions{})
if bErr != nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/erasure-healing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,7 @@ func TestHealLastDataShard(t *testing.T) {
t.Fatal(err)
}

firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{NoLock: true})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1664,7 +1664,7 @@ func TestHealLastDataShard(t *testing.T) {
t.Fatal(err)
}

secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{NoLock: true})
if err != nil {
t.Fatal(err)
}
Expand Down
53 changes: 23 additions & 30 deletions cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d

// GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil.
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
auditObjectErasureSet(ctx, object, &er)

// This is a special call attempted first to check for SOS-API calls.
Expand All @@ -194,34 +194,29 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
// reset any error to 'nil'
err = nil

var unlockOnDefer bool
nsUnlocker := func() {}
defer func() {
if unlockOnDefer {
nsUnlocker()
}
}()

// Acquire lock
if lockType != noLock {
if !opts.NoLock {
lock := er.NewNSLock(bucket, object)
switch lockType {
case writeLock:
lkctx, err := lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.Unlock(lkctx) }
case readLock:
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.RUnlock(lkctx) }
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
unlockOnDefer = true
ctx = lkctx.Context()

// Release lock when the metadata is verified, and reader
// is ready to be read.
//
// This is possible to be lock free because
// - dataDir for any given version is unique, so new writes
// do not update an existing dataDir, allowing us to be lock
// free henceforth.
// - xl.meta for inlined objects has already read the data
// into memory, any mutation on xl.meta subsequently is
// inconsequential to the overall read operation.
// - xl.meta metadata is still verified for quorum under lock()
// however writing the response doesn't need to serialize
// concurrent writers
defer lock.RUnlock(lkctx)
}

fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
Expand Down Expand Up @@ -270,15 +265,13 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
if err != nil {
return nil, err
}
unlockOnDefer = false
return gr.WithCleanupFuncs(nsUnlocker), nil
return gr, nil
}

fn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
if err != nil {
return nil, err
}
unlockOnDefer = false

pr, pw := xioutil.WaitPipe()
go func() {
Expand All @@ -291,7 +284,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
pr.CloseWithError(nil)
}

return fn(pr, h, pipeCloser, nsUnlocker)
return fn(pr, h, pipeCloser)
}

func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
Expand Down
12 changes: 6 additions & 6 deletions cmd/erasure-object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
}
}

gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, opts)
if err != nil {
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
}
z.serverPools[0].erasureDisksMu.Unlock()
// Fetch object from store.
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, opts)
if err != nil {
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
Expand Down Expand Up @@ -831,7 +831,7 @@ func TestPutObjectSmallInlineData(t *testing.T) {
if err != nil {
t.Fatal(err)
}
gr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, ObjectOptions{})
gr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{})
if err != nil {
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
}
Expand All @@ -855,7 +855,7 @@ func TestPutObjectSmallInlineData(t *testing.T) {
if err != nil {
t.Fatal(err)
}
gr, err = obj.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, ObjectOptions{})
gr, err = obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{})
if err != nil {
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
}
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestGetObjectInlineNotInline(t *testing.T) {
}

// Try to read the object and check its md5sum
gr, err := objLayer.GetObjectNInfo(ctx, "testbucket", "file", nil, nil, readLock, ObjectOptions{})
gr, err := objLayer.GetObjectNInfo(ctx, "testbucket", "file", nil, nil, ObjectOptions{})
if err != nil {
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
}
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func TestGetObjectWithOutdatedDisks(t *testing.T) {
sets.erasureDisksMu.Lock()
xl.getDisks = func() []StorageAPI { return origErasureDisks }
sets.erasureDisksMu.Unlock()
gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, readLock, ObjectOptions{VersionID: got.VersionID})
gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, ObjectOptions{VersionID: got.VersionID})
if err != nil {
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/erasure-server-pool-decom.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,10 +838,10 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
encodeDirObject(version.Name),
nil,
http.Header{},
noLock, // all mutations are blocked reads are safe without locks.
ObjectOptions{
VersionID: version.VersionID,
NoDecryption: true,
NoLock: true,
})
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// object deleted by the application, nothing to do here we move on.
Expand Down
2 changes: 1 addition & 1 deletion cmd/erasure-server-pool-rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,10 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
encodeDirObject(version.Name),
nil,
http.Header{},
noLock, // all mutations are blocked reads are safe without locks.
ObjectOptions{
VersionID: version.VersionID,
NoDecryption: true,
NoLock: true,
})
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// object deleted by the application, nothing to do here we move on.
Expand Down

0 comments on commit 309308b

Please sign in to comment.