Skip to content

Commit

Permalink
Merge pull request #1511 from CortexFoundation/dev
Browse files Browse the repository at this point in the history
atomic type
  • Loading branch information
ucwong committed Apr 27, 2023
2 parents f30a06b + cc54e98 commit 5e0df0d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 25 deletions.
5 changes: 2 additions & 3 deletions ctxc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math/big"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/CortexFoundation/CortexTheseus/accounts"
Expand Down Expand Up @@ -503,7 +502,7 @@ func (s *Cortex) StartMining(threads int) error {
}
// If mining is started, we can disable the transaction rejection mechanism
// introduced to speed sync times.
atomic.StoreUint32(&s.protocolManager.acceptTxs, 1)
s.protocolManager.acceptTxs.Store(true)

go s.miner.Start(eb)
}
Expand Down Expand Up @@ -537,7 +536,7 @@ func (s *Cortex) IsListening() bool { return true } // Always l
func (s *Cortex) CortexVersion() int { return int(ProtocolVersions[0]) }
func (s *Cortex) NetVersion() uint64 { return s.networkID }
func (s *Cortex) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
func (s *Cortex) Synced() bool { return atomic.LoadUint32(&s.protocolManager.acceptTxs) == 1 }
func (s *Cortex) Synced() bool { return s.protocolManager.acceptTxs.Load() }
func (s *Cortex) ArchiveMode() bool { return s.config.NoPruning }
func (s *Cortex) CheckPoint() uint64 { return s.protocolManager.checkpointNumber }
func (s *Cortex) CheckPointName() string { return s.protocolManager.checkpointName }
Expand Down
20 changes: 10 additions & 10 deletions ctxc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ type ProtocolManager struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
fastSync atomic.Bool // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)

checkpointNumber uint64 // Block number for the sync progress validator to cross reference
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
Expand Down Expand Up @@ -171,7 +171,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
// In these cases however it's safe to reenable fast sync.
fullBlock, fastBlock := blockchain.CurrentBlock(), blockchain.CurrentFastBlock()
if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
manager.fastSync = uint32(1)
manager.fastSync.Store(true)
log.Warn("Switch sync mode from full sync to fast sync")
}
} else {
Expand All @@ -180,7 +180,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
log.Warn("Switch sync mode from fast sync to full sync")
} else {
// If fast sync was requested and our database is empty, grant it
manager.fastSync = uint32(1)
manager.fastSync.Store(true)
}
}

Expand All @@ -195,7 +195,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
}
// Initiate a sub-protocol for every implemented version we can handle
var stateBloom *trie.SyncBloom
if atomic.LoadUint32(&manager.fastSync) == 1 {
if manager.fastSync.Load() {
stateBloom = trie.NewSyncBloom(uint64(cacheLimit), chaindb)
}
manager.downloader = downloader.New(manager.checkpointNumber, chaindb, stateBloom, manager.eventMux, blockchain, manager.removePeer)
Expand Down Expand Up @@ -223,13 +223,13 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
// accept each others' blocks until a restart. Unfortunately we haven't figured
// out a way yet where nodes can decide unilaterally whether the network is new
// or not. This should be fixed if we figure out a solution.
if atomic.LoadUint32(&manager.fastSync) == 1 {
if manager.fastSync.Load() {
log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
n, err := manager.blockchain.InsertChain(blocks)
if err == nil {
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
manager.acceptTxs.Store(true)
}
return n, err
}
Expand Down Expand Up @@ -560,7 +560,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// If we're doing a fast sync, we must enforce the checkpoint block to avoid
// eclipse attacks. Unsynced nodes are welcome to connect after we're done
// joining the network
if atomic.LoadUint32(&pm.fastSync) == 1 {
if pm.fastSync.Load() {
p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name())
return errors.New("unsynced node cannot serve fast sync")
}
Expand Down Expand Up @@ -808,7 +808,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
case msg.Code == ctxc.NewPooledTransactionHashesMsg && p.version >= ctxc65:
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
if !pm.acceptTxs.Load() {
break
}
var hashes []common.Hash
Expand Down Expand Up @@ -859,7 +859,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {

case msg.Code == ctxc.TransactionMsg || (msg.Code == ctxc.PooledTransactionsMsg && p.version >= ctxc65):
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
if !pm.acceptTxs.Load() {
break
}
// Transactions can be processed, parse all of them and deliver to the pool
Expand Down
5 changes: 2 additions & 3 deletions ctxc/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ctxc
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -197,7 +196,7 @@ func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, 65) }
func testRecvTransactions(t *testing.T, protocol int) {
txAdded := make(chan []*types.Transaction)
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, txAdded)
pm.acceptTxs = 1 // mark synced to accept transactions
pm.acceptTxs.Store(true)
p, _ := newTestPeer("peer", protocol, pm, true)
defer pm.Stop()
defer p.close()
Expand Down Expand Up @@ -331,7 +330,7 @@ func testSyncTransaction(t *testing.T, propagtion bool) {

time.Sleep(250 * time.Millisecond)
pmFetcher.doSync(peerToSyncOp(downloader.FullSync, pmFetcher.peers.BestPeer()))
atomic.StoreUint32(&pmFetcher.acceptTxs, 1)
pmFetcher.acceptTxs.Store(true)

newTxs := make(chan core.NewTxsEvent, 1024)
sub := pmFetcher.txpool.SubscribeNewTxsEvent(newTxs)
Expand Down
9 changes: 4 additions & 5 deletions ctxc/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ctxc
import (
"math/big"
"math/rand"
"sync/atomic"
"time"

"github.com/CortexFoundation/CortexTheseus/common"
Expand Down Expand Up @@ -273,7 +272,7 @@ func peerToSyncOp(mode downloader.SyncMode, p *peer) *chainSyncOp {

func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) {
// If we're in fast sync mode, return that directly
if atomic.LoadUint32(&cs.pm.fastSync) == 1 {
if cs.pm.fastSync.Load() {
block := cs.pm.blockchain.CurrentFastBlock()
td := cs.pm.blockchain.GetTdByHash(block.Hash())
return downloader.FastSync, td
Expand Down Expand Up @@ -324,9 +323,9 @@ func (pm *ProtocolManager) doSync(op *chainSyncOp) error {
if err != nil {
return err
}
if atomic.LoadUint32(&pm.fastSync) == 1 {
if pm.fastSync.Load() {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&pm.fastSync, 0)
pm.fastSync.Store(false)
}

// If we've successfully finished a sync cycle and passed any required checkpoint,
Expand All @@ -336,7 +335,7 @@ func (pm *ProtocolManager) doSync(op *chainSyncOp) error {
// Checkpoint passed, sanity check the timestamp to have a fallback mechanism
// for non-checkpointed (number = 0) private networks.
if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
atomic.StoreUint32(&pm.acceptTxs, 1)
pm.acceptTxs.Store(true)
}
}

Expand Down
7 changes: 3 additions & 4 deletions ctxc/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package ctxc

import (
"sync/atomic"
"testing"
"time"

Expand All @@ -37,12 +36,12 @@ func testFastSyncDisabling(t *testing.T, protocol int) {

// Create a pristine protocol manager, check that fast sync is left enabled
pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil)
if atomic.LoadUint32(&pmEmpty.fastSync) == 0 {
if !pmEmpty.fastSync.Load() {
t.Fatalf("fast sync disabled on pristine blockchain")
}
// Create a full protocol manager, check that fast sync gets disabled
pmFull, _ := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil)
if atomic.LoadUint32(&pmFull.fastSync) == 1 {
if pmFull.fastSync.Load() {
t.Fatalf("fast sync not disabled on non-empty blockchain")
}

Expand All @@ -58,7 +57,7 @@ func testFastSyncDisabling(t *testing.T, protocol int) {
}

// Check that fast sync was disabled
if atomic.LoadUint32(&pmEmpty.fastSync) == 1 {
if pmEmpty.fastSync.Load() {
t.Fatalf("fast sync not disabled after successful synchronisation")
}
}

0 comments on commit 5e0df0d

Please sign in to comment.