Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch off gossiped txs during block production #7010

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,10 @@ void SetTotalDifficultyDeep(BlockHeader current)
if (_logger.IsTrace) _logger.Trace($"Calculated total difficulty for {header} is {header.TotalDifficulty}");
}

public void OnBlocksProcessing(IReadOnlyList<Block> blocks) => BlocksProcessing?.Invoke(this, blocks);

public event EventHandler<IReadOnlyList<Block>>? BlocksProcessing;

public event EventHandler<BlockReplacementEventArgs>? BlockAddedToMain;

public event EventHandler<OnUpdateMainChainArgs>? OnUpdateMainChain;
Expand Down
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/ChainHeadInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using Nethermind.Blockchain.Spec;
using Nethermind.Core;
Expand Down Expand Up @@ -34,6 +35,7 @@ public ChainHeadInfoProvider(IChainHeadSpecProvider specProvider, IBlockTree blo
HeadNumber = blockTree.BestKnownNumber;

blockTree.BlockAddedToMain += OnHeadChanged;
blockTree.BlocksProcessing += OnHeadProcessing;
}

public IChainHeadSpecProvider SpecProvider { get; }
Expand All @@ -49,6 +51,7 @@ public ChainHeadInfoProvider(IChainHeadSpecProvider specProvider, IBlockTree blo
public UInt256 CurrentPricePerBlobGas { get; internal set; }

public event EventHandler<BlockReplacementEventArgs>? HeadChanged;
public event EventHandler<IReadOnlyList<Block>>? HeadProcessing;

private void OnHeadChanged(object? sender, BlockReplacementEventArgs e)
{
Expand All @@ -61,5 +64,8 @@ private void OnHeadChanged(object? sender, BlockReplacementEventArgs e)
: UInt256.Zero;
HeadChanged?.Invoke(sender, e);
}

private void OnHeadProcessing(object? sender, IReadOnlyList<Block> blocks)
=> HeadProcessing?.Invoke(sender, blocks);
}
}
6 changes: 5 additions & 1 deletion src/Nethermind/Nethermind.Blockchain/IBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ public interface IBlockTree : IBlockFinder
/// A block is marked as canon
/// </summary>
event EventHandler<BlockReplacementEventArgs> BlockAddedToMain;

/// <summary>
/// Blocks are being processed
/// </summary>
event EventHandler<IReadOnlyList<Block>> BlocksProcessing;
/// <summary>
/// A block is now set as head
/// </summary>
Expand All @@ -185,5 +188,6 @@ public interface IBlockTree : IBlockFinder
void UpdateBeaconMainChain(BlockInfo[]? blockInfos, long clearBeaconMainChainStartPoint);

void RecalculateTreeLevels();
void OnBlocksProcessing(IReadOnlyList<Block> blocks);
}
}
5 changes: 4 additions & 1 deletion src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Int256;

namespace Nethermind.Blockchain
{
Expand All @@ -20,10 +19,12 @@ namespace Nethermind.Blockchain
public class ReadOnlyBlockTree : IReadOnlyBlockTree
{
private readonly IBlockTree _wrapped;
public event EventHandler<IReadOnlyList<Block>> BlocksProcessing;

public ReadOnlyBlockTree(IBlockTree wrapped)
{
_wrapped = wrapped;
_wrapped.BlocksProcessing += (e, blocks) => OnBlocksProcessing(blocks);
}

public ulong NetworkId => _wrapped.NetworkId;
Expand Down Expand Up @@ -197,5 +198,7 @@ public int DeleteChainSlice(in long startNumber, long? endNumber = null, bool fo
public void UpdateMainChain(IReadOnlyList<Block> blocks, bool wereProcessed, bool forceHeadBlock = false) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(UpdateMainChain)} calls");

public void ForkChoiceUpdated(Hash256? finalizedBlockHash, Hash256? safeBlockBlockHash) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(ForkChoiceUpdated)} calls");

public void OnBlocksProcessing(IReadOnlyList<Block> blocks) => BlocksProcessing?.Invoke(this, blocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private void FireProcessingQueueEmpty()

ProcessingBranch processingBranch = PrepareProcessingBranch(suggestedBlock, options);
PrepareBlocksToProcess(suggestedBlock, options, processingBranch);
_blockTree.OnBlocksProcessing(processingBranch.Blocks);

_stopwatch.Restart();
Block[]? processedBlocks = ProcessBranch(processingBranch, options, tracer, out error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void DisableTxFiltering()
public override int MessageIdSpaceSize => 8;
public override string Name => "eth62";
protected override TimeSpan InitTimeout => Timeouts.Eth62Status;
protected bool CanReceiveTransactions => _txGossipPolicy.ShouldListenToGossipedTransactions;
protected bool CanReceiveTransactions => _txGossipPolicy.ShouldListenToGossipedTransactions && _txPool.IsAcceptingTxs;

public override event EventHandler<ProtocolInitializedEventArgs>? ProtocolInitialized;

Expand Down
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.TxPool/IChainHeadInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;

using Nethermind.Core;
using Nethermind.Core.Specs;
using Nethermind.Int256;
Expand All @@ -23,5 +25,6 @@ public interface IChainHeadInfoProvider
public UInt256 CurrentPricePerBlobGas { get; }

event EventHandler<BlockReplacementEventArgs> HeadChanged;
event EventHandler<IReadOnlyList<Block>>? HeadProcessing;
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.TxPool/ITxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Nethermind.TxPool
{
public interface ITxPool
{
bool IsAcceptingTxs { get; }

int GetPendingTransactionsCount();
int GetPendingBlobTransactionsCount();
Transaction[] GetPendingTransactions();
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.TxPool/NullTxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class NullTxPool : ITxPool
private NullTxPool() { }

public static NullTxPool Instance { get; } = new();
public bool IsAcceptingTxs => true;

public int GetPendingTransactionsCount() => 0;
public int GetPendingBlobTransactionsCount() => 0;
Expand Down
16 changes: 15 additions & 1 deletion src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Nethermind.Logging;
using Nethermind.TxPool.Collections;
using ITimer = Nethermind.Core.Timers.ITimer;
using System.Diagnostics;

namespace Nethermind.TxPool
{
Expand All @@ -27,6 +28,7 @@ internal class TxBroadcaster : IDisposable
private readonly IChainHeadInfoProvider _headInfo;
private readonly ITxGossipPolicy _txGossipPolicy;
private readonly Func<Transaction, bool> _gossipFilter;
private readonly Stopwatch _lastBroadcastPersistent = Stopwatch.StartNew();

/// <summary>
/// Timer for rebroadcasting pending own transactions.
Expand Down Expand Up @@ -143,7 +145,13 @@ public void AnnounceOnce(ITxPoolPeer peer, Transaction[] txs)
public void OnNewHead()
{
_baseFeeThreshold = CalculateBaseFeeThreshold();
BroadcastPersistentTxs();
ulong milliseconds = (ulong)_lastBroadcastPersistent.ElapsedMilliseconds;
_lastBroadcastPersistent.Restart();
// Are we processing real-time blocks or are we catching up?
if (milliseconds > 500)
{
BroadcastPersistentTxs();
}
}

internal UInt256 CalculateBaseFeeThreshold()
Expand Down Expand Up @@ -210,6 +218,8 @@ internal void BroadcastPersistentTxs()
{
if (_logger.IsDebug) _logger.Debug($"PeerNotificationThreshold is not a positive value: {_txPoolConfig.PeerNotificationThreshold}. Skipping broadcasting persistent transactions.");
}

ResumeBroadcastingTxs();
}

internal (IList<Transaction>? TransactionsToSend, IList<Transaction>? HashesToSend) GetPersistentTxsToSend()
Expand Down Expand Up @@ -286,6 +296,10 @@ public void EnsureStopBroadcastUpToNonce(Address address, UInt256 nonce)
}
}

public void StopBroadcastingTxs() => _timer.Enabled = false;

public void ResumeBroadcastingTxs() => _timer.Enabled = true;

private void TimerOnElapsed(object? sender, EventArgs args)
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down
31 changes: 29 additions & 2 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class TxPool : ITxPool, IDisposable
private readonly ITimer? _timer;
private Transaction[]? _transactionSnapshot;
private Transaction[]? _blobTransactionSnapshot;
private bool _notAcceptingTxs;

public bool IsAcceptingTxs => !_notAcceptingTxs;

/// <summary>
/// This class stores all known pending transactions that can be used for block production
Expand Down Expand Up @@ -116,6 +119,7 @@ public class TxPool : ITxPool, IDisposable
if (_blobTransactions.Count > 0) _blobTransactions.UpdatePool(_accounts, _updateBucket);

_headInfo.HeadChanged += OnHeadChange;
_headInfo.HeadProcessing += OnHeadProcessing;

_preHashFilters = new IIncomingTxFilter[]
{
Expand Down Expand Up @@ -178,6 +182,13 @@ public class TxPool : ITxPool, IDisposable

public int GetPendingBlobTransactionsCount() => _blobTransactions.Count;

private void OnHeadProcessing(object? sender, IReadOnlyList<Block> e)
{
_notAcceptingTxs = true;
_broadcaster.StopBroadcastingTxs();
if (_logger.IsDebug) _logger.Debug("TxPool Not accepting txs");
}

private void OnHeadChange(object? sender, BlockReplacementEventArgs e)
{
try
Expand Down Expand Up @@ -209,6 +220,8 @@ private void ProcessNewHeads()
ReAddReorganisedTransactions(args.PreviousBlock);
RemoveProcessedTransactions(args.Block);
UpdateBuckets();
_notAcceptingTxs = false;
if (_logger.IsDebug) _logger.Debug("TxPool Accepting txs");
_broadcaster.OnNewHead();
Metrics.TransactionCount = _transactions.Count;
Metrics.BlobTransactionCount = _blobTransactions.Count;
Expand Down Expand Up @@ -242,7 +255,7 @@ private void ReAddReorganisedTransactions(Block? previousBlock)
continue;
}
_hashCache.Delete(tx.Hash!);
SubmitTx(tx, isEip155Enabled ? TxHandlingOptions.None : TxHandlingOptions.PreEip155Signing);
SubmitTxInternal(tx, isEip155Enabled ? TxHandlingOptions.None : TxHandlingOptions.PreEip155Signing);
}

if (_blobReorgsSupportEnabled
Expand All @@ -254,7 +267,7 @@ private void ReAddReorganisedTransactions(Block? previousBlock)
if (_logger.IsTrace) _logger.Trace($"Readded tx {blobTx.Hash} from reorged block {previousBlock.Number} (hash {previousBlock.Hash}) to blob pool");
_hashCache.Delete(blobTx.Hash!);
blobTx.SenderAddress ??= _ecdsa.RecoverAddress(blobTx);
SubmitTx(blobTx, isEip155Enabled ? TxHandlingOptions.None : TxHandlingOptions.PreEip155Signing);
SubmitTxInternal(blobTx, isEip155Enabled ? TxHandlingOptions.None : TxHandlingOptions.PreEip155Signing);
}
if (_logger.IsDebug) _logger.Debug($"Readded txs from reorged block {previousBlock.Number} (hash {previousBlock.Hash}) to blob pool");

Expand Down Expand Up @@ -366,7 +379,21 @@ public void RemovePeer(PublicKey nodeId)
public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions)
{
Metrics.PendingTransactionsReceived++;
if (_notAcceptingTxs &&
(handlingOptions & TxHandlingOptions.PersistentBroadcast) == 0 &&
!tx.IsSystem())
{
// In block processing mode, we don't accept new transactions unless local
Metrics.PendingTransactionsDiscarded++;
// Isn't a busy response so we will just say already known
return AcceptTxResult.AlreadyKnown;
}

return SubmitTxInternal(tx, handlingOptions);
}

private AcceptTxResult SubmitTxInternal(Transaction tx, TxHandlingOptions handlingOptions)
{
// assign a sequence number to transaction so we can order them by arrival times when
// gas prices are exactly the same
tx.PoolIndex = Interlocked.Increment(ref _txIndex);
Expand Down
Loading