Skip to content

Commit

Permalink
Add epochgc (#1841)
Browse files Browse the repository at this point in the history
  • Loading branch information
daviszhen committed Mar 8, 2022
1 parent f4f8a5f commit 56fcd3f
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 105 deletions.
7 changes: 7 additions & 0 deletions cmd/db-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func removeEpoch(epoch uint64) {
if err != nil {
fmt.Printf("catalog remove ddl failed. error :%v \n", err)
}
if tpe,ok := config.StorageEngine.(*tpeEngine.TpeEngine) ; ok {
err = tpe.RemoveDeletedTable(epoch)
if err != nil {
fmt.Printf("tpeEngine remove ddl failed. error :%v \n", err)
}
}

}

func main() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tpe/computation/computationHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ type ComputationHandler interface {
Read(readCtx interface{}) (*batch.Batch, error)

Write(writeCtx interface{}, bat *batch.Batch) error

RemoveDeletedTable(epoch uint64) (int, error)
}
10 changes: 8 additions & 2 deletions pkg/vm/engine/tpe/descriptor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func ExtractIndexAttributeDescIDs(attrs []*AttributeDesc) []int {
return ids
}

type EpochGCItem struct {
Epoch uint64
DbID uint64
TableID uint64
}

// DescriptorHandler loads and updates the descriptors
type DescriptorHandler interface {

Expand Down Expand Up @@ -267,6 +273,6 @@ type DescriptorHandler interface {
//StoreRelationDescIntoAsyncGC stores the table into the asyncgc table
StoreRelationDescIntoAsyncGC(epoch uint64, dbID uint64, desc *RelationDesc) error

//ListRelationDescFromAsyncGC gets all the tables from the asyncgc table
ListRelationDescFromAsyncGC(epoch uint64) ([]*RelationDesc, error)
//ListRelationDescFromAsyncGC gets all the tables need to deleted from the asyncgc table
ListRelationDescFromAsyncGC(epoch uint64) ([]EpochGCItem, error)
}
8 changes: 8 additions & 0 deletions pkg/vm/engine/tpe/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,12 @@ func (te * TpeEngine) Database(name string) (engine.Database, error) {

func (te * TpeEngine) Node(s string) *engine.NodeInfo {
return &engine.NodeInfo{Mcpu: 1}
}

func (te * TpeEngine) RemoveDeletedTable(epoch uint64) error {
_, err := te.computeHandler.RemoveDeletedTable(epoch)
if err != nil {
return err
}
return nil
}
45 changes: 0 additions & 45 deletions pkg/vm/engine/tpe/epoch/epoch.go

This file was deleted.

31 changes: 0 additions & 31 deletions pkg/vm/engine/tpe/epoch/epoch_test.go

This file was deleted.

13 changes: 9 additions & 4 deletions pkg/vm/engine/tpe/tuplecodec/computationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ComputationHandlerImpl struct {
tch *TupleCodecHandler
serializer ValueSerializer
indexHandler index.IndexHandler
epochHandler * EpochHandler
}

func (chi *ComputationHandlerImpl) Read(readCtx interface{}) (*batch.Batch, error) {
Expand Down Expand Up @@ -168,8 +169,8 @@ func (chi *ComputationHandlerImpl) GetDatabase(dbName string) (*descriptor.Datab
//callbackForGetDatabaseDesc extracts the databaseDesc
func (chi *ComputationHandlerImpl) callbackForGetDatabaseDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) {
//get the name and the desc
descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
descDI := dis[InternalDescriptorTableID_desc_ID]
descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
descDI := dis[InternalDescriptorTable_desc_ID]
if !(descDI.IsValueType(descAttr.Ttype)) {
return nil,errorTypeInValueNotEqualToTypeInAttribute
}
Expand Down Expand Up @@ -303,8 +304,8 @@ func (chi *ComputationHandlerImpl) DropTableByDesc(epoch, dbId uint64, tableDesc
//callbackForGetTableDesc extracts the tableDesc
func (chi *ComputationHandlerImpl) callbackForGetTableDesc (callbackCtx interface{},dis []*orderedcodec.DecodedItem)([]byte,error) {
//get the name and the desc
descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTableID_desc_ID]
descDI := dis[InternalDescriptorTableID_desc_ID]
descAttr := internalDescriptorTableDesc.Attributes[InternalDescriptorTable_desc_ID]
descDI := dis[InternalDescriptorTable_desc_ID]
if !(descDI.IsValueType(descAttr.Ttype)) {
return nil,errorTypeInValueNotEqualToTypeInAttribute
}
Expand Down Expand Up @@ -374,6 +375,10 @@ func (chi *ComputationHandlerImpl) GetTable(dbId uint64, name string) (*descript
return tableDesc,nil
}

func (chi *ComputationHandlerImpl) RemoveDeletedTable(epoch uint64) (int, error) {
return chi.epochHandler.RemoveDeletedTable(epoch)
}

type AttributeStateForWrite struct {
PositionInBatch int

Expand Down
24 changes: 19 additions & 5 deletions pkg/vm/engine/tpe/tuplecodec/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ const (
//holding the schema of the table
InternalDescriptorTableID uint64 = 0

InternalDescriptorTableID_parentID_ID = 0
InternalDescriptorTableID_id_ID = 1
InternalDescriptorTableID_name_ID = 2
InternalDescriptorTableID_desc_ID = 3
PrimaryIndexID uint32 = 1
InternalDescriptorTable_parentID_ID = 0
InternalDescriptorTable_id_ID = 1
InternalDescriptorTable_name_ID = 2
InternalDescriptorTable_desc_ID = 3
PrimaryIndexID uint32 = 1

//holding the epochgced table
InternalAsyncGCTableID uint64 = 1
InternalAsyncGCTable_epoch_ID = 0
InternalAsyncGCTable_dbID_ID = 1
InternalAsyncGCTable_tableID_ID = 2
InternalAsyncGCTable_desc_ID = 3

//user table id offset
UserTableIDOffset uint64 = 3
Expand Down Expand Up @@ -214,6 +218,16 @@ var (
ID: 0,
Type: orderedcodec.VALUE_TYPE_UINT64,
},
{
Name: "dbID",
ID: 1,
Type: orderedcodec.VALUE_TYPE_UINT64,
},
{
Name: "tableID",
ID: 2,
Type: orderedcodec.VALUE_TYPE_UINT64,
},
},
Impilict_attributes: nil,
Composite_attributes: nil,
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/tpe/tuplecodec/cubekv.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
errorCubeDriverIsNull = errors.New("cube driver is nil")
errorInvalidIDPool = errors.New("invalid idpool")
errorInvalidKeyValueCount = errors.New("key count != value count")
errorUnsupportedInCubeKV = errors.New("unsupported in cubekv")
)
var _ KVHandler = &CubeKV{}

Expand Down Expand Up @@ -215,6 +216,10 @@ func (ck * CubeKV) Delete(key TupleKey) error {
return ck.Cube.Delete(key)
}

func (ck *CubeKV) DeleteWithPrefix(prefix TupleKey) error {
return errorUnsupportedInCubeKV
}

// Get gets the value of the key.
// If the key does not exist, it returns the null
func (ck * CubeKV) Get(key TupleKey) (TupleValue, error) {
Expand Down

0 comments on commit 56fcd3f

Please sign in to comment.