Skip to content

Commit

Permalink
add reload on RAFT and DB aligned to handle catch up on restart
Browse files Browse the repository at this point in the history
Signed-off-by: Loic Reyreaud <[email protected]>
  • Loading branch information
reyreaud-l committed May 13, 2024
1 parent e4d4854 commit 660ca9c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 27 deletions.
2 changes: 1 addition & 1 deletion adapters/repos/db/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (db *DB) localNodeStatistics() (*models.Statistics, error) {
IsVoter: stats["is_voter"].(bool),
Open: stats["open"].(bool),
Bootstrapped: stats["bootstrapped"].(bool),
InitialLastAppliedIndex: stats["initial_last_applied_index"].(uint64),
InitialLastAppliedIndex: stats["last_store_log_applied_index"].(uint64),
DbLoaded: stats["db_loaded"].(bool),
Candidates: stats["candidates"],
Raft: raft,
Expand Down
6 changes: 4 additions & 2 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,10 @@ func (db *localDB) apply(op applyOp) error {
return fmt.Errorf("%w: %s: %w", errSchema, op.op, err)
}

if err := op.updateStore(); err != nil {
return fmt.Errorf("%w: %s: %w", errDB, op.op, err)
if !op.schemaOnly {
if err := op.updateStore(); err != nil {
return fmt.Errorf("%w: %s: %w", errDB, op.op, err)
}
}

// Always trigger the schema callback last
Expand Down
70 changes: 46 additions & 24 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ type Store struct {
mutex sync.Mutex
candidates map[string]string

// initialLastAppliedIndex represents the index of the last applied command when the store is opened.
initialLastAppliedIndex uint64
// lastAppliedIndexOnStart represents the index of the last applied command when the store is opened.
lastAppliedIndexOnStart uint64

// lastIndex atomic.Uint64

Expand Down Expand Up @@ -239,7 +239,7 @@ func (st *Store) Open(ctx context.Context) (err error) {
}

rLog := rLog{st.logStore}
st.initialLastAppliedIndex, err = rLog.LastAppliedCommand()
st.lastAppliedIndexOnStart, err = rLog.LastAppliedCommand()
if err != nil {
return fmt.Errorf("read log last command: %w", err)
}
Expand All @@ -252,19 +252,19 @@ func (st *Store) Open(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("raft.NewRaft %v %w", st.transport.LocalAddr(), err)
}
if st.initialLastAppliedIndex <= st.raft.LastIndex() {
if st.lastAppliedIndexOnStart <= st.raft.LastIndex() {
// this should include empty and non empty node
st.openDatabase(ctx)
}

st.lastAppliedIndex.Store(st.raft.AppliedIndex())

st.log.WithFields(logrus.Fields{
"raft_applied_index": st.raft.AppliedIndex(),
"raft_last_index": st.raft.LastIndex(),
"initial_store_log_applied_index": st.initialLastAppliedIndex,
"last_store_applied_index": st.lastAppliedIndex.Load(),
"last_snapshot_index": snapshotIndex(st.snapshotStore),
"raft_applied_index": st.raft.AppliedIndex(),
"raft_last_index": st.raft.LastIndex(),
"last_store_log_applied_index": st.lastAppliedIndexOnStart,
"last_store_applied_index": st.lastAppliedIndex.Load(),
"last_snapshot_index": snapshotIndex(st.snapshotStore),
}).Info("raft node constructed")

// There's no hard limit on the migration, so it should take as long as necessary.
Expand Down Expand Up @@ -516,8 +516,8 @@ func (f *Store) FindSimilarClass(name string) string {
// The value of "candidates" is a map[string]string of the current candidates IDs/addresses,
// see Store.candidates.
//
// The value of "initial_last_applied_index" is the index of the last applied command found when
// the store was opened, see Store.initialLastAppliedIndex.
// The value of "last_store_log_applied_index" is the index of the last applied command found when
// the store was opened, see Store.lastAppliedIndexOnStart.
//
// The value of "last_applied_index" is the index of the latest update to the store,
// see Store.lastAppliedIndex.
Expand All @@ -540,7 +540,7 @@ func (st *Store) Stats() map[string]any {
stats["open"] = st.open.Load()
stats["bootstrapped"] = st.bootstrapped.Load()
stats["candidates"] = st.candidates
stats["initial_last_applied_index"] = st.initialLastAppliedIndex
stats["last_store_log_applied_index"] = st.lastAppliedIndexOnStart
stats["last_applied_index"] = st.lastAppliedIndex.Load()
stats["db_loaded"] = st.dbLoaded.Load()

Expand Down Expand Up @@ -640,8 +640,29 @@ func (st *Store) Apply(l *raft.Log) interface{} {
panic("error proto un-marshalling log data")
}

schemaOnly := l.Index <= st.initialLastAppliedIndex
// schemaOnly is necessary so that on restart when we are re-applying RAFT log entries to our in-memory schema we
// don't update the database. This can lead to dataloss for example if we drop then re-add a class.
// If we don't have any last applied index on start, schema only is always false.
schemaOnly := st.lastAppliedIndexOnStart != 0 && l.Index <= st.lastAppliedIndexOnStart
defer func() {
// If we have an applied index from the previous store (i.e from disk). Then reload the DB once we catch up as
// that means we're done doing schema only.
if st.lastAppliedIndexOnStart != 0 && l.Index == st.lastAppliedIndexOnStart {
st.log.WithFields(logrus.Fields{
"log_type": l.Type,
"log_name": l.Type.String(),
"log_index": l.Index,
"last_store_log_applied_index": st.lastAppliedIndexOnStart,
}).Debug("reloading local DB as RAFT and local DB are now caught up")
cs := make([]command.UpdateClassRequest, len(st.db.Schema.Classes))
i := 0
for _, v := range st.db.Schema.Classes {
cs[i] = command.UpdateClassRequest{Class: &v.Class, State: &v.Sharding}
i++
}
st.db.store.ReloadLocalDB(context.Background(), cs)
}

st.lastAppliedIndex.Store(l.Index)
if ret.Error != nil {
st.log.WithFields(logrus.Fields{
Expand All @@ -657,12 +678,13 @@ func (st *Store) Apply(l *raft.Log) interface{} {

cmd.Version = l.Index
st.log.WithFields(logrus.Fields{
"log_type": l.Type,
"log_name": l.Type.String(),
"log_index": l.Index,
"cmd_type": cmd.Type,
"cmd_type_name": cmd.Type.String(),
"cmd_class": cmd.Class,
"log_type": l.Type,
"log_name": l.Type.String(),
"log_index": l.Index,
"cmd_type": cmd.Type,
"cmd_type_name": cmd.Type.String(),
"cmd_class": cmd.Class,
"cmd_schema_only": schemaOnly,
}).Debug("server.apply")
switch cmd.Type {

Expand Down Expand Up @@ -887,11 +909,11 @@ func (st *Store) reloadDBFromSnapshot() bool {
// the snapshot already includes the state from the raft log
snapIndex := snapshotIndex(st.snapshotStore)
st.log.WithFields(logrus.Fields{
"last_applied_index": st.lastAppliedIndex.Load(),
"initial_last_applied_index": st.initialLastAppliedIndex,
"last_snapshot_index": snapIndex,
"last_applied_index": st.lastAppliedIndex.Load(),
"last_store_log_applied_index": st.lastAppliedIndexOnStart,
"last_snapshot_index": snapIndex,
}).Info("load local db from snapshot")
if st.initialLastAppliedIndex <= snapIndex {
if st.lastAppliedIndexOnStart <= snapIndex {
st.openDatabase(ctx)
return true
}
Expand All @@ -908,7 +930,7 @@ func (st *Store) reloadDBFromSnapshot() bool {
st.db.store.ReloadLocalDB(context.Background(), cs)

st.dbLoaded.Store(true)
st.initialLastAppliedIndex = 0
st.lastAppliedIndexOnStart = 0
return true
}

Expand Down

0 comments on commit 660ca9c

Please sign in to comment.