Skip to content

Commit

Permalink
remove invalid lock table bind (#12713)
Browse files Browse the repository at this point in the history
remove invalid lock table bind in current cn, if commit failed and meet
lock table changed error.
  • Loading branch information
zhangxu19830126 committed Nov 13, 2023
1 parent e5bada4 commit 90dac68
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 105 deletions.
7 changes: 4 additions & 3 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (l *lockTableAllocator) KeepLockTableBind(serviceID string) bool {
return b.active()
}

func (l *lockTableAllocator) Valid(binds []pb.LockTable) bool {
func (l *lockTableAllocator) Valid(binds []pb.LockTable) []uint64 {
var invalid []uint64
l.mu.RLock()
defer l.mu.RUnlock()
for _, b := range binds {
Expand All @@ -103,10 +104,10 @@ func (l *lockTableAllocator) Valid(binds []pb.LockTable) bool {
zap.String("current", current.DebugString()),
zap.String("received", b.DebugString()))
}
return false
invalid = append(invalid, b.Table)
}
}
return true
return invalid
}

func (l *lockTableAllocator) Close() error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/lockservice/lock_table_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestValid(t *testing.T) {
time.Hour,
func(a *lockTableAllocator) {
b := a.Get("s1", 1)
assert.True(t, a.Valid([]pb.LockTable{b}))
assert.Empty(t, a.Valid([]pb.LockTable{b}))
})
}

Expand All @@ -182,7 +182,7 @@ func TestValidWithServiceInvalid(t *testing.T) {
func(a *lockTableAllocator) {
b := a.Get("s1", 1)
b.ServiceID = "s2"
assert.False(t, a.Valid([]pb.LockTable{b}))
assert.NotEmpty(t, a.Valid([]pb.LockTable{b}))
})
}

Expand All @@ -193,7 +193,7 @@ func TestValidWithVersionChanged(t *testing.T) {
func(a *lockTableAllocator) {
b := a.Get("s1", 1)
b.Version++
assert.False(t, a.Valid([]pb.LockTable{b}))
assert.NotEmpty(t, a.Valid([]pb.LockTable{b}))
})
}

Expand Down
24 changes: 21 additions & 3 deletions pkg/lockservice/service_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,25 @@ func (s *service) GetWaitingList(
return true, waitingList, nil
}

func (s *service) ForceRefreshLockTableBinds() {
func (s *service) ForceRefreshLockTableBinds(targets ...uint64) {
contains := func(id uint64) bool {
if len(targets) == 0 {
return true
}
for _, v := range targets {
if v == id {
return true
}
}
return false
}

s.tables.Range(func(key, value any) bool {
value.(lockTable).close()
s.tables.Delete(key)
id := key.(uint64)
if contains(id) {
value.(lockTable).close()
s.tables.Delete(key)
}
return true
})
}
Expand All @@ -62,6 +77,9 @@ func (s *service) GetLockTableBind(tableID uint64) (pb.LockTable, error) {
if err != nil {
return pb.LockTable{}, err
}
if l == nil {
return pb.LockTable{}, nil
}
return l.getBind(), nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/lockservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type LockService interface {
// GetWaitingList get special txnID's waiting list
GetWaitingList(ctx context.Context, txnID []byte) (bool, []pb.WaitTxn, error)
// ForceRefreshLockTableBinds force refresh all lock tables binds
ForceRefreshLockTableBinds()
ForceRefreshLockTableBinds(targets ...uint64)
// GetLockTableBind returns lock table bind
GetLockTableBind(tableID uint64) (pb.LockTable, error)
// IterLocks iter all locks on current lock service. len(keys) == 2 if is range lock,
Expand Down Expand Up @@ -168,7 +168,7 @@ type LockTableAllocator interface {
// period of time to maintain the binding, the binding will become invalid.
KeepLockTableBind(serviceID string) bool
// Valid check for changes in the binding relationship of a specific locktable.
Valid(binds []pb.LockTable) bool
Valid(binds []pb.LockTable) []uint64
// Close close the lock table allocator
Close() error
}
Expand Down
276 changes: 193 additions & 83 deletions pkg/pb/txn/txn.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/txn/client/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,11 @@ func (tc *txnOperator) handleErrorResponse(resp txn.TxnResponse) error {
return err
}

// commit failed, refresh invalid lock tables
if err != nil && moerr.IsMoErrCode(err, moerr.ErrLockTableBindChanged) {
tc.option.lockService.ForceRefreshLockTableBinds(resp.CommitResponse.InvalidLockTables...)
}

v, ok := moruntime.ProcessLevelRuntime().GetGlobalVariables(moruntime.EnableCheckInvalidRCErrors)
if ok && v.(bool) {
if moerr.IsMoErrCode(err, moerr.ErrTxnWWConflict) ||
Expand Down
44 changes: 44 additions & 0 deletions pkg/txn/client/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,50 @@ func TestCommitWithLockTables(t *testing.T) {
})
}

func TestCommitWithLockTablesChanged(t *testing.T) {
runOperatorTests(t, func(ctx context.Context, tc *txnOperator, ts *testTxnSender) {
lockservice.RunLockServicesForTest(
zap.DebugLevel,
[]string{"s1"},
time.Second,
func(lta lockservice.LockTableAllocator, ls []lockservice.LockService) {
s := ls[0]

_, err := s.Lock(ctx, 1, [][]byte{[]byte("k1")}, tc.txnID, lock.LockOptions{})
assert.NoError(t, err)
_, err = s.Lock(ctx, 2, [][]byte{[]byte("k1")}, tc.txnID, lock.LockOptions{})
assert.NoError(t, err)

ts.setManual(func(sr *rpc.SendResult, err error) (*rpc.SendResult, error) {
sr.Responses[0].TxnError = txn.WrapError(moerr.NewLockTableBindChanged(ctx), 0)
sr.Responses[0].CommitResponse = &txn.TxnCommitResponse{
InvalidLockTables: []uint64{1},
}
return sr, nil
})

tc.mu.txn.Mode = txn.TxnMode_Pessimistic
tc.option.lockService = s
tc.AddLockTable(lock.LockTable{Table: 1})
tc.AddLockTable(lock.LockTable{Table: 2})
tc.mu.txn.TNShards = append(tc.mu.txn.TNShards, metadata.TNShard{TNShardRecord: metadata.TNShardRecord{ShardID: 1}})
err = tc.Commit(ctx)
assert.Error(t, err)

// table 1 will be removed
bind, err := s.GetLockTableBind(1)
require.NoError(t, err)
require.Equal(t, lock.LockTable{}, bind)

// table 2 will be kept
bind, err = s.GetLockTableBind(2)
require.NoError(t, err)
require.NotEqual(t, lock.LockTable{}, bind)
},
nil)
})
}

func TestContextWithoutDeadlineWillPanic(t *testing.T) {
runOperatorTests(t, func(_ context.Context, tc *txnOperator, _ *testTxnSender) {
defer func() {
Expand Down
11 changes: 7 additions & 4 deletions pkg/txn/service/service_cn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,13 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response
s.logger.Fatal("commit with empty tn shards")
}

if len(request.Txn.LockTables) > 0 &&
!s.allocator.Valid(request.Txn.LockTables) {
response.TxnError = txn.WrapError(moerr.NewLockTableBindChanged(ctx), 0)
return nil
if len(request.Txn.LockTables) > 0 {
invalidBinds := s.allocator.Valid(request.Txn.LockTables)
if len(invalidBinds) > 0 {
response.CommitResponse.InvalidLockTables = invalidBinds
response.TxnError = txn.WrapError(moerr.NewLockTableBindChanged(ctx), 0)
return nil
}
}

txnID := request.Txn.ID
Expand Down
12 changes: 5 additions & 7 deletions pkg/txn/service/service_cn_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReadBasic(t *testing.T) {
checkReadResponses(t, resp, "")
}

func TestReadWithTNShartnotMatch(t *testing.T) {
func TestReadWithTNShardNotMatch(t *testing.T) {
sender := NewTestSender()
defer func() {
assert.NoError(t, sender.Close())
Expand All @@ -79,7 +79,6 @@ func TestReadWithTNShartnotMatch(t *testing.T) {
resp := readTestData(t, sender, 1, rTxn, 1)
checkResponses(t, resp,
txn.WrapError(moerr.NewTNShardNotFound(context.TODO(), "", 1), 0))
// newTxnError(moerr.ErrDNShardNotFound, "txn not active"))
}

func TestReadWithSelfWrite(t *testing.T) {
Expand Down Expand Up @@ -128,7 +127,7 @@ func TestReadBlockWithClock(t *testing.T) {
assert.Equal(t, int64(3), ts)
}

func TestReadCannotBlockByUncomitted(t *testing.T) {
func TestReadCannotBlockByUncommitted(t *testing.T) {
sender := NewTestSender()
defer func() {
assert.NoError(t, sender.Close())
Expand Down Expand Up @@ -428,7 +427,7 @@ func TestWriteBasic(t *testing.T) {
assert.Equal(t, GetTestValue(1, wTxn), v)
}

func TestWriteWithTNShartnotMatch(t *testing.T) {
func TestWriteWithTNShardNotMatch(t *testing.T) {
sender := NewTestSender()
defer func() {
assert.NoError(t, sender.Close())
Expand Down Expand Up @@ -471,7 +470,6 @@ func TestWriteWithWWConflict(t *testing.T) {
wTxn2 := NewTestTxn(2, 1)
checkResponses(t, writeTestData(t, sender, 1, wTxn2, 1),
txn.WrapError(moerr.NewTAEWrite(context.TODO()), 0))
// newTxnError(moerr.ErrTAEWrite, "write conlict"))
}

func TestCommitWithSingleTNShard(t *testing.T) {
Expand Down Expand Up @@ -509,7 +507,7 @@ func TestCommitWithSingleTNShard(t *testing.T) {
}
}

func TestCommitWithTNShartnotMatch(t *testing.T) {
func TestCommitWithTNShardNotMatch(t *testing.T) {
sender := NewTestSender()
defer func() {
assert.NoError(t, sender.Close())
Expand Down Expand Up @@ -739,7 +737,7 @@ func TestRollback(t *testing.T) {
checkData(t, wTxn, s2, 2, 0, false)
}

func TestRollbackWithTNShartnotFound(t *testing.T) {
func TestRollbackWithTNShardNotFound(t *testing.T) {
sender := NewTestSender()
defer func() {
assert.NoError(t, sender.Close())
Expand Down
1 change: 1 addition & 0 deletions proto/txn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ message TxnCommitRequest {

// TxnCommitResponse response of TxnCommitRequest.
message TxnCommitResponse {
repeated uint64 InvalidLockTables = 1;
}

// TxnCommitRequest CN sent the rollback request to coordinator TN.
Expand Down

0 comments on commit 90dac68

Please sign in to comment.