Skip to content

Commit

Permalink
Merge branch 'main' into main-3231
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed May 11, 2024
2 parents fbfa07d + 65dc0d7 commit 1ac3e04
Show file tree
Hide file tree
Showing 9 changed files with 1,834 additions and 74 deletions.
86 changes: 48 additions & 38 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,48 +393,33 @@ func (e *Engine) getOrCreateSnapCatalogCache(

func (e *Engine) getOrCreateSnapPart(
ctx context.Context,
databaseId uint64,
dbName string,
tableId uint64,
tblName string,
primaySeqnum int,
tbl *txnTable,
ts types.TS) (*logtailreplay.Partition, error) {

//First, check whether the latest partition is available.
e.Lock()
partition, ok := e.partitions[[2]uint64{databaseId, tableId}]
e.Unlock()
if ok && partition.CanServe(ts) {
return partition, nil
}

//Then, check whether the snapshot partitions is available.
//check whether the snapshot partitions are available for reuse.
e.mu.Lock()
snaps, ok := e.mu.snapParts[[2]uint64{databaseId, tableId}]
tblSnaps, ok := e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}]
if !ok {
e.mu.snapParts[[2]uint64{databaseId, tableId}] = &struct {
e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}] = &struct {
sync.Mutex
snaps []*logtailreplay.Partition
}{}
snaps = e.mu.snapParts[[2]uint64{databaseId, tableId}]
tblSnaps = e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}]
}
e.mu.Unlock()

snaps.Lock()
defer snaps.Unlock()
for _, snap := range snaps.snaps {
tblSnaps.Lock()
defer tblSnaps.Unlock()
for _, snap := range tblSnaps.snaps {
if snap.CanServe(ts) {
return snap, nil
}
}

//new snapshot partition and apply checkpoints into it.
snap := logtailreplay.NewPartition()
//TODO::if tableId is mo_tables, or mo_colunms, or mo_database,
// we should init the partition,ref to engine.init
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.fs, ts, tableId, nil)
if ckps == nil {
return nil, moerr.NewInternalErrorNoCtx("No checkpoints for snapshot read")
}
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.fs, ts, tbl.tableId, nil)
if err != nil {
return nil, err
}
Expand All @@ -448,10 +433,10 @@ func (e *Engine) getOrCreateSnapPart(
entries, closeCBs, err := logtail.LoadCheckpointEntries(
ctx,
locations,
tableId,
tblName,
databaseId,
dbName,
tbl.tableId,
tbl.tableName,
tbl.db.databaseId,
tbl.db.databaseName,
e.mp,
e.fs)
if err != nil {
Expand All @@ -463,26 +448,46 @@ func (e *Engine) getOrCreateSnapPart(
}
}()
for _, entry := range entries {
if err = consumeEntry(ctx, primaySeqnum, e, nil, state, entry); err != nil {
if err = consumeEntry(
ctx,
tbl.primarySeqnum,
e,
nil,
state,
entry); err != nil {
return err
}
}
return nil
})
if snap.CanServe(ts) {
tblSnaps.snaps = append(tblSnaps.snaps, snap)
return snap, nil
}

start, end := snap.GetDuration()
if ts.Greater(&end) || ts.Less(&start) {
//if has no checkpoints or ts > snap.end, use latest partition.
if snap.IsEmpty() || ts.Greater(&end) {
err := tbl.updateLogtail(ctx)
if err != nil {
return nil, err
}
return e.getOrCreateLatestPart(tbl.db.databaseId, tbl.tableId), nil
}
if ts.Less(&start) {
return nil, moerr.NewInternalErrorNoCtx(
"Invalid checkpoints for snapshot read,snapshot:%s, start:%s, end:%s",
"No valid checkpoints for snapshot read,maybe snapshot is too old, "+
"snapshot:%s, start:%s, end:%s",
ts.ToTimestamp().DebugString(),
start.ToTimestamp().DebugString(),
end.ToTimestamp().DebugString())
}
snaps.snaps = append(snaps.snaps, snap)

return snap, nil
panic("impossible path")
}

func (e *Engine) getOrCreateLatestPart(databaseId, tableId uint64) *logtailreplay.Partition {
func (e *Engine) getOrCreateLatestPart(
databaseId,
tableId uint64) *logtailreplay.Partition {
e.Lock()
defer e.Unlock()
partition, ok := e.partitions[[2]uint64{databaseId, tableId}]
Expand All @@ -493,7 +498,9 @@ func (e *Engine) getOrCreateLatestPart(databaseId, tableId uint64) *logtailrepla
return partition
}

func (e *Engine) lazyLoadLatestCkp(ctx context.Context, tbl *txnTable) (*logtailreplay.Partition, error) {
func (e *Engine) lazyLoadLatestCkp(
ctx context.Context,
tbl *txnTable) (*logtailreplay.Partition, error) {
part := e.getOrCreateLatestPart(tbl.db.databaseId, tbl.tableId)
cache := e.getLatestCatalogCache()

Expand Down Expand Up @@ -531,6 +538,9 @@ func (e *Engine) lazyLoadLatestCkp(ctx context.Context, tbl *txnTable) (*logtail
return part, nil
}

func (e *Engine) UpdateOfPush(ctx context.Context, databaseId, tableId uint64, ts timestamp.Timestamp) error {
func (e *Engine) UpdateOfPush(
ctx context.Context,
databaseId,
tableId uint64, ts timestamp.Timestamp) error {
return e.pClient.TryToSubscribeTable(ctx, databaseId, tableId)
}
3 changes: 0 additions & 3 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,6 @@ func (e *Engine) Database(ctx context.Context, name string,
if !txn.op.IsSnapOp() {
catalog = e.getLatestCatalogCache()
} else {
if name == "test" {
logutil.Infof("xxxx Database-getOrCreateSnapCatalogCache: txn:%s", txn.op.Txn().DebugString())
}
catalog, err = e.getOrCreateSnapCatalogCache(
ctx,
types.TimestampToTS(txn.op.SnapshotTS()))
Expand Down
19 changes: 13 additions & 6 deletions pkg/vm/engine/disttae/logtailreplay/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -107,12 +107,18 @@ func (p *Partition) Unlock() {
p.lock <- struct{}{}
}

func (p *Partition) checkValid() bool {
func (p *Partition) IsValid() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.start.LessEq(&p.mu.end)
}

func (p *Partition) IsEmpty() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.start == types.MaxTs()
}

func (p *Partition) UpdateStart(ts types.TS) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -145,6 +151,9 @@ func (p *Partition) ConsumeSnapCkps(
) (
err error,
) {
if len(ckps) == 0 {
return nil
}
//Notice that checkpoints must contain only one or zero global checkpoint
//followed by zero or multi continuous incremental checkpoints.
state := p.state.Load()
Expand Down Expand Up @@ -173,10 +182,9 @@ func (p *Partition) ConsumeSnapCkps(
end = start
}
p.UpdateDuration(start, end)
if !p.checkValid() {
panic("invalid checkpoint")
if !p.IsValid() {
return moerr.NewInternalErrorNoCtx("invalid checkpoints duration")
}

return nil
}

Expand Down Expand Up @@ -207,7 +215,6 @@ func (p *Partition) ConsumeCheckpoints(

curState = p.state.Load()
if len(curState.checkpoints) == 0 {
logutil.Infof("xxxx impossible path")
p.UpdateDuration(types.TS{}, types.MaxTs())
return nil
}
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,6 @@ func (txn *Transaction) forEachTableWrites(databaseId uint64, tableId uint64, of
// getCachedTable returns the cached table in this transaction if it exists, nil otherwise.
// Before it gets the cached table, it checks whether the table is deleted by another
// transaction by go through the delete tables slice, and advance its cachedIndex.
// TODO::get snapshot table from cache for snapshot read
func (txn *Transaction) getCachedTable(
ctx context.Context,
k tableKey,
Expand Down
33 changes: 8 additions & 25 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,19 @@ func (tbl *txnTable) stats(ctx context.Context) (*pb.StatsInfo, error) {
logutil.Errorf("failed to unmarshal partition table: %v", err)
return nil, err
}
var cataChe *cache.CatalogCache
if !tbl.db.op.IsSnapOp() {
cataChe = e.getLatestCatalogCache()
} else {
cataChe, err = e.getOrCreateSnapCatalogCache(
ctx,
types.TimestampToTS(tbl.db.op.SnapshotTS()))
for _, partitionTableName := range partitionInfo.PartitionTableNames {
partitionTable, err := tbl.db.Relation(ctx, partitionTableName, nil)
if err != nil {
return nil, err
}
}
for _, partitionTableName := range partitionInfo.PartitionTableNames {
partitionTable := cataChe.GetTableByName(
tbl.db.databaseId, partitionTableName)
partitionsTableDef = append(partitionsTableDef, partitionTable.TableDef)

partitionsTableDef = append(partitionsTableDef, partitionTable.(*txnTable).tableDef)
var ps *logtailreplay.PartitionState
if !tbl.db.op.IsSnapOp() {
ps = e.getOrCreateLatestPart(tbl.db.databaseId, partitionTable.Id).Snapshot()
ps = e.getOrCreateLatestPart(tbl.db.databaseId, partitionTable.(*txnTable).tableId).Snapshot()
} else {
p, err := e.getOrCreateSnapPart(
ctx,
tbl.db.databaseId,
partitionTable.TableDef.GetDbName(),
partitionTable.Id,
partitionTable.TableDef.GetName(),
partitionTable.PrimarySeqnum,
partitionTable.(*txnTable),
types.TimestampToTS(tbl.db.op.SnapshotTS()),
)
if err != nil {
Expand Down Expand Up @@ -2253,12 +2239,9 @@ func (tbl *txnTable) getPartitionState(

// for snapshot txnOp
if tbl._partState.Load() == nil {
p, err := tbl.getTxn().engine.getOrCreateSnapPart(ctx,
tbl.db.databaseId,
tbl.db.databaseName,
tbl.tableId,
tbl.tableName,
tbl.primarySeqnum,
p, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ type Transaction struct {
rowId [6]uint32
segId types.Uuid
// use to cache opened snapshot tables by current txn.
//TODO::cache snapshot tables for snapshot read.
tableCache struct {
cachedIndex int
tableMap *sync.Map
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (db *DB) ForceCheckpoint(
default:
err = db.BGCheckpointRunner.ForceIncrementalCheckpoint(ts, true)
if dbutils.IsRetrieableCheckpoint(err) {
db.BGCheckpointRunner.CleanPenddingCheckpoint()
interval := flushDuration.Milliseconds() / 400
time.Sleep(time.Duration(interval))
break
Expand Down

0 comments on commit 1ac3e04

Please sign in to comment.