Skip to content

Commit

Permalink
refactor: update lib to return stream of block events instead
Browse files Browse the repository at this point in the history
  • Loading branch information
oleonardolima committed May 31, 2022
1 parent b74cb39 commit 23d823e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 29 deletions.
28 changes: 25 additions & 3 deletions src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use bitcoin::{Address, Network};
use block_explorer_cli::BlockEvent;
use block_explorer_cli::{fetch_data_stream, MempoolSpaceWebSocketRequestData};
use clap::{ArgGroup, Parser, Subcommand};
use futures_util::pin_mut;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use std::str::FromStr;

Expand All @@ -9,7 +12,8 @@ use std::str::FromStr;
#[clap(author = "Leonardo L.")]
#[clap(version = "0.1")]
#[clap(about = "A work in progress CLI block explorer to be used with BDK, consuming data from mempool.space websocket.\n
This an initial competency test for Summer of Bitcoin 2022", long_about = None)]
This an initial competency test for Summer of Bitcoin 2022",
long_about = None)]

struct Cli {
#[clap(subcommand)]
Expand Down Expand Up @@ -57,13 +61,31 @@ struct TrackAddressMessage {
}

#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();

let data = build_request_data(&cli);
let network = cli.network;

fetch_data_stream(&network, &data).await.unwrap();
let data_stream = fetch_data_stream(&network, &data).await?;

pin_mut!(data_stream);

while let Some(data) = data_stream.next().await {
match data {
BlockEvent::Connected(block) => {
println!("received following event: Block Connected: {:#?}", block);
},
BlockEvent::Disconnected((height, block_hash)) => {
println!("received following event: Block Disconnected: [height {:#?}] [block_hash: {:#?}]", height, block_hash);
}
BlockEvent::Error() => {
eprint!("ERR: received an error from the data_stream");
}
}
};

Ok(())
}

#[allow(deprecated)]
Expand Down
29 changes: 12 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,25 @@ extern crate env_logger;

mod mempool_space;

pub use mempool_space::api::{MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestData};
use anyhow::Ok;
pub use mempool_space::{subscribe_to_new_blocks, build_websocket_request_message};
pub use mempool_space::api::{MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestData, BlockEvent};
use anyhow::{anyhow};
use bitcoin::Network;
use futures_util::StreamExt;
use mempool_space::{subscribe_to_new_blocks, build_websocket_request_message};
use futures_core::Stream;

use futures_util::pin_mut;

pub async fn fetch_data_stream(network: &Network, data: &MempoolSpaceWebSocketRequestData) -> anyhow::Result<()> {
pub async fn fetch_data_stream(network: &Network, data: &MempoolSpaceWebSocketRequestData) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
env_logger::init();

match data {
MempoolSpaceWebSocketRequestData::Blocks => {
let message = build_websocket_request_message(&data);
let block_stream = subscribe_to_new_blocks(&network, &message).await?;
pin_mut!(block_stream);

while let Some(block) = block_stream.next().await {
println!("received following new block: {:#?}", block);
};
subscribe_to_new_blocks(&network, &message).await
},
MempoolSpaceWebSocketRequestData::MempoolBlocks => {
return Err(anyhow!("currently the mempool-blocks feature is no implemented yet."));
},
MempoolSpaceWebSocketRequestData::TrackAddress(_address) => {
return Err(anyhow!("currently the track-address feature is no implemented yet."));
},
MempoolSpaceWebSocketRequestData::MempoolBlocks => { eprintln!("currently the mempool-blocks feature is no implemented yet.") },
MempoolSpaceWebSocketRequestData::TrackAddress(_address) => { eprintln!("currently the track-address feature is no implemented yet.") },
}

Ok(())
}
9 changes: 8 additions & 1 deletion src/mempool_space/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,12 @@ pub struct MempoolSpaceWebSocketRequestMessage {
pub enum MempoolSpaceWebSocketRequestData {
Blocks,
MempoolBlocks,
TrackAddress(Address), // TODO:(@leonardo.lima) Update it to use bitcoin::Address instead
TrackAddress(Address),
}

#[derive(Debug)]
pub enum BlockEvent {
Connected(BlockExtended),
Disconnected((u32, BlockHash)),
Error(),
}
6 changes: 3 additions & 3 deletions src/mempool_space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ pub mod api;
pub mod websocket;

use anyhow;
use api::{BlockExtended, MempoolSpaceWebSocketRequestMessage, MempoolSpaceWebSocketRequestData};
use api::{MempoolSpaceWebSocketRequestMessage, MempoolSpaceWebSocketRequestData, BlockEvent};
use bitcoin::Network;
use futures_core::Stream;
use url::Url;

pub async fn subscribe_to_new_blocks(network: &Network, message: &MempoolSpaceWebSocketRequestMessage) -> anyhow::Result<impl Stream<Item = BlockExtended>>{
pub async fn subscribe_to_new_blocks(network: &Network, message: &MempoolSpaceWebSocketRequestMessage) -> anyhow::Result<impl Stream<Item = BlockEvent>>{
let url: Url = url::Url::parse(&build_websocket_address(&network)).unwrap();
websocket::publish_message(url, &message).await
websocket::connect_and_publish_message(url, &message).await
}

pub fn build_websocket_request_message(data: &MempoolSpaceWebSocketRequestData) -> MempoolSpaceWebSocketRequestMessage {
Expand Down
9 changes: 4 additions & 5 deletions src/mempool_space/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use anyhow::{anyhow, Ok};
use futures_util::{SinkExt, StreamExt};
use std::{time::Duration};
use super::api::{MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestMessage, BlockExtended};
use super::api::{MempoolSpaceWebSocketMessage, MempoolSpaceWebSocketRequestMessage, BlockExtended, BlockEvent};
use tokio_tungstenite::connect_async_tls_with_config;
use tokio_tungstenite::tungstenite::protocol::Message;
use url::Url;

use async_stream::stream;
use futures_util::stream::Stream;

pub async fn publish_message(url: Url, message: &MempoolSpaceWebSocketRequestMessage) -> anyhow::Result<impl Stream<Item = BlockExtended>> {
pub async fn connect_and_publish_message(url: Url, message: &MempoolSpaceWebSocketRequestMessage) -> anyhow::Result<impl Stream<Item = BlockEvent>> {
let (mut websocket_stream, websocket_response) =
connect_async_tls_with_config(&url, None, None)
.await
Expand All @@ -35,9 +35,8 @@ pub async fn publish_message(url: Url, message: &MempoolSpaceWebSocketRequestMes
if let Some(message) = message {
match message.unwrap() {
Message::Text(text) => {
let obj: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
let block = obj.block;
yield block;
let res_message: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
yield BlockEvent::Connected(res_message.block);
},
Message::Close(_) => {
eprintln!("websocket closing gracefully");
Expand Down

0 comments on commit 23d823e

Please sign in to comment.