From b55c72bce624a4aacdc8566d75f7611a98f7f337 Mon Sep 17 00:00:00 2001 From: Wael Date: Sun, 28 May 2023 15:48:17 +0200 Subject: [PATCH 1/3] add start_newPendingTransactions_loop --- client/src/rpc/impls/eth_pubsub.rs | 40 ++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/client/src/rpc/impls/eth_pubsub.rs b/client/src/rpc/impls/eth_pubsub.rs index 74e94a471..6b52f952e 100644 --- a/client/src/rpc/impls/eth_pubsub.rs +++ b/client/src/rpc/impls/eth_pubsub.rs @@ -50,9 +50,12 @@ pub struct PubSubClient { handler: Arc, heads_subscribers: Arc>>, logs_subscribers: Arc>>, + pending_transactions_subscribers: Arc>>, epochs_ordered: Arc)>>, + new_block_hashes: Arc>, consensus: SharedConsensusGraph, heads_loop_started: Arc>, + pending_transactions_loop_started: Arc>, } impl PubSubClient { @@ -64,6 +67,7 @@ impl PubSubClient { { let heads_subscribers = Arc::new(RwLock::new(Subscribers::default())); let logs_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let pending_transactions_subscribers = Arc::new(RwLock::new(Subscribers::default())); let handler = Arc::new(ChainNotificationHandler { executor, @@ -76,9 +80,12 @@ impl PubSubClient { handler, heads_subscribers, logs_subscribers, + pending_transactions_subscribers, + new_block_hashes: notifications.new_block_hashes.clone(), epochs_ordered: notifications.epochs_ordered.clone(), consensus: consensus.clone(), heads_loop_started: Arc::new(RwLock::new(false)), + pending_transactions_loop_started: Arc::new(RwLock::new(false)), } } @@ -91,6 +98,32 @@ impl PubSubClient { self.epochs_ordered.clone() } + fn start_newPendingTransactions_loop(&self) { + let mut loop_started = self.pending_transactions_loop_started.write(); + if *loop_started { + return; + } + + debug!("start_pending_transactions_loop"); + *loop_started = true; + + let pending_tx_subscribers: Arc>>> = self.pending_transactions_subscribers.clone(); + + let mut receiver = self.new_block_hashes.subscribe(); + + let fut = async move { + while let Some(new_tx) = receiver.recv().await { + debug!("new_pending_transactions_loop: {:?}", new_tx); + + // publish new pending transaction + // pending_tx_subscribers.notify(new_tx); + } + }; + + let fut = fut.unit_error().boxed().compat(); + self.handler.executor.spawn(fut); + } + fn start_heads_loop(&self) { let mut loop_started = self.heads_loop_started.write(); if *loop_started { @@ -542,6 +575,13 @@ impl PubSub for PubSubClient { ) { let error = match (kind, params) { + // --------- newPendingTransactions --------- + (pubsub::Kind::NewPendingTransactions, None) => { + info!("eth pubsub newPendingTransactions"); + self.pending_transactions_subscribers.write().push(subscriber); + self.start_newPendingTransactions_loop(); + return; + } // --------- newHeads --------- (pubsub::Kind::NewHeads, None) => { info!("eth pubsub newheads"); From a5d068064a2f086e9230f3a3b324b78cc044c118 Mon Sep 17 00:00:00 2001 From: Wael Almattar Date: Sat, 3 Jun 2023 18:55:38 +0200 Subject: [PATCH 2/3] notify pending_transactions_subscribers --- client/src/rpc/impls/eth_pubsub.rs | 41 +++++++++++++++++++++++------- core/src/channel.rs | 2 ++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/client/src/rpc/impls/eth_pubsub.rs b/client/src/rpc/impls/eth_pubsub.rs index 6b52f952e..55faf7378 100644 --- a/client/src/rpc/impls/eth_pubsub.rs +++ b/client/src/rpc/impls/eth_pubsub.rs @@ -52,10 +52,10 @@ pub struct PubSubClient { logs_subscribers: Arc>>, pending_transactions_subscribers: Arc>>, epochs_ordered: Arc)>>, - new_block_hashes: Arc>, consensus: SharedConsensusGraph, heads_loop_started: Arc>, pending_transactions_loop_started: Arc>, + new_pending_transactions: Arc>, } impl PubSubClient { @@ -74,6 +74,7 @@ impl PubSubClient { consensus: consensus.clone(), data_man: consensus.get_data_manager().clone(), heads_subscribers: heads_subscribers.clone(), + pending_transactions_subscribers: pending_transactions_subscribers.clone() }); PubSubClient { @@ -81,9 +82,9 @@ impl PubSubClient { heads_subscribers, logs_subscribers, pending_transactions_subscribers, - new_block_hashes: notifications.new_block_hashes.clone(), epochs_ordered: notifications.epochs_ordered.clone(), consensus: consensus.clone(), + new_pending_transactions: notifications.new_pending_transactions.clone(), heads_loop_started: Arc::new(RwLock::new(false)), pending_transactions_loop_started: Arc::new(RwLock::new(false)), } @@ -98,7 +99,7 @@ impl PubSubClient { self.epochs_ordered.clone() } - fn start_newPendingTransactions_loop(&self) { + fn start_new_pending_transactions_loop(&self) { let mut loop_started = self.pending_transactions_loop_started.write(); if *loop_started { return; @@ -107,16 +108,15 @@ impl PubSubClient { debug!("start_pending_transactions_loop"); *loop_started = true; - let pending_tx_subscribers: Arc>>> = self.pending_transactions_subscribers.clone(); - - let mut receiver = self.new_block_hashes.subscribe(); + let mut receiver = self.new_pending_transactions.subscribe(); + let handler_clone = self.handler.clone(); + let fut = async move { while let Some(new_tx) = receiver.recv().await { debug!("new_pending_transactions_loop: {:?}", new_tx); - // publish new pending transaction - // pending_tx_subscribers.notify(new_tx); + handler_clone.notify_new_pending_transaction(new_tx); } }; @@ -265,6 +265,7 @@ pub struct ChainNotificationHandler { consensus: SharedConsensusGraph, data_man: Arc, heads_subscribers: Arc>>, + pending_transactions_subscribers: Arc>>, } impl ChainNotificationHandler { @@ -287,6 +288,28 @@ impl ChainNotificationHandler { let _ = fut.compat().await; } + // notify each subscriber about new pending transaction `hash` concurrently + fn notify_new_pending_transaction(&self, hash: H256) { + info!("notify_new_pending_transaction({:?})", hash); + + let subscribers = self.pending_transactions_subscribers.read(); + + // do not retrieve anything unnecessarily + if subscribers.is_empty() { + debug!("No subscribers for pending transactions"); + return; + } + + debug!("Notify {}", hash); + for subscriber in subscribers.values() { + Self::notify( + &self.executor, + subscriber, + pubsub::Result::TransactionHash(hash), + ); + } + } + // notify each subscriber about header `hash` concurrently // NOTE: multiple calls to this method will result in concurrent // notifications, so the headers published might be reordered. @@ -579,7 +602,7 @@ impl PubSub for PubSubClient { (pubsub::Kind::NewPendingTransactions, None) => { info!("eth pubsub newPendingTransactions"); self.pending_transactions_subscribers.write().push(subscriber); - self.start_newPendingTransactions_loop(); + self.start_new_pending_transactions_loop(); return; } // --------- newHeads --------- diff --git a/core/src/channel.rs b/core/src/channel.rs index b91e37256..e8d5712c4 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -113,6 +113,7 @@ pub struct Notifications { pub new_block_hashes: Arc>, pub epochs_ordered: Arc)>>, pub blame_verification_results: Arc)>>, /* */ + pub new_pending_transactions: Arc>, } impl Notifications { @@ -123,6 +124,7 @@ impl Notifications { blame_verification_results: Arc::new(Channel::new( "blame-verification-results", )), + new_pending_transactions: Arc::new(Channel::new("new-pending-transactions")), }) } } From 3d75c9cc902e2b24ef5ffc68bce2860e804f9834 Mon Sep 17 00:00:00 2001 From: Wael Almattar Date: Sun, 11 Jun 2023 01:14:15 +0200 Subject: [PATCH 3/3] improve logs --- client/src/rpc/impls/eth_pubsub.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/src/rpc/impls/eth_pubsub.rs b/client/src/rpc/impls/eth_pubsub.rs index 55faf7378..9dfe0c1f3 100644 --- a/client/src/rpc/impls/eth_pubsub.rs +++ b/client/src/rpc/impls/eth_pubsub.rs @@ -114,7 +114,7 @@ impl PubSubClient { let fut = async move { while let Some(new_tx) = receiver.recv().await { - debug!("new_pending_transactions_loop: {:?}", new_tx); + trace!("new_pending_transactions_loop: {:?}", new_tx); handler_clone.notify_new_pending_transaction(new_tx); } @@ -290,13 +290,13 @@ impl ChainNotificationHandler { // notify each subscriber about new pending transaction `hash` concurrently fn notify_new_pending_transaction(&self, hash: H256) { - info!("notify_new_pending_transaction({:?})", hash); + trace!("notify_new_pending_transaction({:?})", hash); let subscribers = self.pending_transactions_subscribers.read(); // do not retrieve anything unnecessarily if subscribers.is_empty() { - debug!("No subscribers for pending transactions"); + trace!("No subscribers for pending transactions"); return; }