Skip to content

Commit

Permalink
malloc: optimizations (#16037) (#16039)
Browse files Browse the repository at this point in the history
malloc: refine cache size calculation

malloc: remove AllocTyped

malloc: remove eviction

malloc: add shard alloc and free statistics

malloc: flush cached objects when idle

Approved by: @zhangxu19830126, @sukki37
  • Loading branch information
reusee committed May 13, 2024
1 parent 90a2947 commit 22194fc
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 78 deletions.
18 changes: 0 additions & 18 deletions pkg/common/malloc/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,3 @@ func BenchmarkParallelAllocFree(b *testing.B) {
}
})
}

func BenchmarkAllocTyped(b *testing.B) {
for i := 0; i < b.N; i++ {
var f *float64
handle := AllocTyped(&f)
handle.Free()
}
}

func BenchmarkParallelAllocTyped(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var f *float64
handle := AllocTyped(&f)
handle.Free()
}
})
}
46 changes: 46 additions & 0 deletions pkg/common/malloc/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package malloc

import "time"

func init() {
go func() {
lastNumAllocs := make([]int64, numShards)
for range time.NewTicker(time.Second * 37).C {
for i := 0; i < numShards; i++ {
numAllocs := shards[i].numAlloc.Load()
if numAllocs == lastNumAllocs[i] {
// not active, flush
shards[i].flush()
}
lastNumAllocs[i] = numAllocs
}
}
}()
}

func (s *Shard) flush() {
for _, ch := range s.pools {
loop:
for {
select {
case <-ch:
default:
break loop
}
}
}
}
7 changes: 3 additions & 4 deletions pkg/common/malloc/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ func (h *Handle) Free() {
}
pid := runtime_procPin()
runtime_procUnpin()
if pid >= len(shards) {
pid = 0
}
shard := pid % numShards
select {
case shards[pid][h.class] <- h:
case shards[shard].pools[h.class] <- h:
shards[shard].numFree.Add(1)
default:
}
}
82 changes: 26 additions & 56 deletions pkg/common/malloc/malloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,30 @@
package malloc

import (
"runtime"
"time"
"sync/atomic"
"unsafe"

"github.com/matrixorigin/matrixone/pkg/logutil"
"go.uber.org/zap"
)

const (
maxBufferSize = 1 << 31
maxBufferSize = 1 << 30
minClassSize = 128
maxClassSize = 1 << 20
classSizeFactor = 1.5
numShards = 16
)

var (
numShards = runtime.GOMAXPROCS(-1)
evictAllDuration = time.Hour * 7
minEvictInterval = time.Millisecond * 100
type Shard struct {
numAlloc atomic.Int64
numFree atomic.Int64
pools []chan *Handle
}

var (
bufferedObjectsPerClass = func() int {
n := maxBufferSize / numShards / len(classSizes) / ((minClassSize + maxClassSize) / 2)
if n < 8 {
n = 8
}
n := maxBufferSize / numShards / classSumSize
logutil.Info("malloc",
zap.Any("max buffer size", maxBufferSize),
zap.Any("num shards", numShards),
Expand All @@ -58,41 +57,26 @@ var (
return
}()

shards = func() (ret [][]chan *Handle) {
for i := 0; i < numShards; i++ {
var shard []chan *Handle
for range classSizes {
shard = append(shard, make(chan *Handle, bufferedObjectsPerClass))
}
ret = append(ret, shard)
classSumSize = func() (ret int) {
for _, size := range classSizes {
ret += size
}
return
}()
)

func init() {
// evict
go func() {
evictOne := func() {
for _, shard := range shards {
for i := len(shard) - 1; i >= 0; i-- {
select {
case <-shard[i]:
return
default:
}
}
shards = func() (ret []Shard) {
ret = make([]Shard, numShards)
for i := 0; i < numShards; i++ {
for range classSizes {
ret[i].pools = append(
ret[i].pools,
make(chan *Handle, bufferedObjectsPerClass),
)
}
}
interval := evictAllDuration / time.Duration(numShards*len(classSizes)*bufferedObjectsPerClass)
if interval < minEvictInterval {
interval = minEvictInterval
}
for range time.NewTicker(interval).C {
evictOne()
}
return
}()
}
)

func requestSizeToClass(size int) int {
for class, classSize := range classSizes {
Expand All @@ -106,11 +90,10 @@ func requestSizeToClass(size int) int {
func classAllocate(class int) *Handle {
pid := runtime_procPin()
runtime_procUnpin()
if pid >= len(shards) {
pid = 0
}
shard := pid % numShards
select {
case handle := <-shards[pid][class]:
case handle := <-shards[shard].pools[class]:
shards[shard].numAlloc.Add(1)
clear(unsafe.Slice((*byte)(handle.ptr), classSizes[handle.class]))
return handle
default:
Expand All @@ -134,19 +117,6 @@ func Alloc(n int) (unsafe.Pointer, *Handle) {
return handle.ptr, handle
}

func AllocTyped[T any](target **T) *Handle {
var t T
size := unsafe.Sizeof(t)
class := requestSizeToClass(int(size))
if class == -1 {
*target = new(T)
return dumbHandle
}
handle := classAllocate(class)
*target = (*T)(handle.ptr)
return handle
}

//go:linkname runtime_procPin runtime.procPin
func runtime_procPin() int

Expand Down

0 comments on commit 22194fc

Please sign in to comment.