Skip to content

Commit

Permalink
wip(lib): move ws connection and polling fn to lib module
Browse files Browse the repository at this point in the history
  • Loading branch information
oleonardolima committed May 18, 2022
1 parent 8347c45 commit 3191aa3
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ tokio = { version = "1.0.0", features = ["io-util", "io-std", "macros", "net", "
tokio-tungstenite = { version = "0.17.1", features = ["connect", "native-tls"]}
url = { version = "2.0.0" }

# [lib]
# name = "block_explorer_cli"
# path = "src/lib.rs"
[lib]
name = "block_explorer_cli"
path = "src/lib.rs"

[[bin]]
name = "block-explorer-cli"
Expand Down
52 changes: 2 additions & 50 deletions src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use anyhow::{anyhow, Ok};
use clap::{Subcommand, Parser};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use url::Url;
use std::{env, time::Duration};
use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::protocol::Message};
use std::{env};
use block_explorer_cli::fetch_blocks;

#[derive(Parser)]
#[clap(name = "CLI block explorer with mempool.space websocket - WIP")]
Expand Down Expand Up @@ -74,51 +71,6 @@ async fn main() {

}

async fn fetch_blocks(url: Url, message: String) -> anyhow::Result<()> {

let (mut websocket_stream, _ws_res) = connect_async_tls_with_config(url, None, None)
.await
.expect("failed to connect with url");
println!("websocket handshake successfully completed!");

if let Err(_) = websocket_stream.send(Message::text(message)).await {
return Err(anyhow!("Failed to send first message to websocket"));
}

// need to ping every so often to keep websocket alive
let mut pinger = tokio::time::interval(Duration::from_secs(60));

loop {
tokio::select! {
message = websocket_stream.next() => {
if let Some(message) = message {
match message? {
Message::Text(text) => {
let obj: serde_json::Value = serde_json::from_str(&text).unwrap();
println!("{}", serde_json::to_string_pretty(&obj).unwrap());
},
Message::Close(_) => {
eprintln!("websocket closing gracefully");
break;
},
Message::Binary(_) => {
eprintln!("unexpected binary message");
break;
},
_ => { /*ignore*/ }
}
}
}
_ = pinger.tick() => {
websocket_stream.send(Message::Ping(vec![])).await.unwrap()
}
}
}

Ok(())

}

fn build_request_message(cli: &Cli) -> String {

match &cli.command {
Expand Down
67 changes: 67 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use anyhow::{anyhow, Ok};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize};
use url::Url;
use std::{time::Duration};
use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::protocol::Message};

#[derive(Deserialize, Debug)]
#[allow(dead_code)]
struct MempoolSpaceBlock {
id: String, // TODO: (@leonardo.lima) parse this into BlockHash type from rust-bitcoin
previousblockhash: String, // TODO: (@leonardo.lima) parse this into BlockHash type from rust-bitcoin
height: u32,
timestamp: u32,
}

#[derive(Deserialize, Debug)]
#[allow(dead_code)]
struct MempoolSpaceWebSocketMessage {
block: MempoolSpaceBlock,
// TODO: (@leonardo.lima) should we use the other fields: difficult adjustment, mempool-info ?
}

pub async fn fetch_blocks(url: Url, message: String) -> anyhow::Result<()> {

let (mut websocket_stream, _ws_res) = connect_async_tls_with_config(url, None, None)
.await
.expect("failed to connect with url");
println!("websocket handshake successfully completed!");

if let Err(_) = websocket_stream.send(Message::text(message)).await {
return Err(anyhow!("Failed to send first message to websocket"));
}

// need to ping every so often to keep websocket alive
let mut pinger = tokio::time::interval(Duration::from_secs(60));

loop {
tokio::select! {
message = websocket_stream.next() => {
if let Some(message) = message {
match message? {
Message::Text(text) => {
let obj: MempoolSpaceWebSocketMessage = serde_json::from_str(&text).unwrap();
println!("{:?}", &obj);
},
Message::Close(_) => {
eprintln!("websocket closing gracefully");
break;
},
Message::Binary(_) => {
eprintln!("unexpected binary message");
break;
},
_ => { /*ignore*/ }
}
}
}
_ = pinger.tick() => {
websocket_stream.send(Message::Ping(vec![])).await.unwrap()
}
}
}

Ok(())

}

0 comments on commit 3191aa3

Please sign in to comment.