Skip to content

Commit

Permalink
--autoreconnect-delay-millis
Browse files Browse the repository at this point in the history
Addresses #66
  • Loading branch information
vi committed Jan 9, 2020
1 parent c28b2b5 commit 682d52b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate tokio_reactor;
extern crate tokio_tcp;
extern crate tokio_udp;
extern crate tokio_codec;
extern crate tokio_timer;
extern crate websocket;
extern crate websocket_base;
extern crate http_bytes;
Expand All @@ -41,8 +42,6 @@ extern crate smart_default;
#[macro_use]
extern crate derivative;

extern crate tokio_timer;

use futures::future::Future;
use tokio_io::{AsyncRead, AsyncWrite};

Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ struct Opt {
/// Maximum number of messages to copy in the other direction.
#[structopt(long = "max-messages-rev")]
max_messages_rev: Option<usize>,

/// [A] Delay before reconnect attempt for `autoreconnect:` overlay.
#[structopt(long = "--autoreconnect-delay-millis", default_value="20")]
autoreconnect_delay_millis: u64,
}

// TODO: make it byte-oriented/OsStr?
Expand Down Expand Up @@ -688,6 +692,7 @@ fn run() -> Result<()> {
no_exit_on_zeromsg
max_messages
max_messages_rev
autoreconnect_delay_millis
);
#[cfg(feature = "ssl")]
{
Expand Down
2 changes: 2 additions & 0 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,6 @@ pub struct Options {
pub request_uri: Option<http::Uri>,
pub request_method: Option<http::Method>,
pub request_headers: Vec<(http::header::HeaderName, http::header::HeaderValue)>,

pub autoreconnect_delay_millis: u64,
}
22 changes: 21 additions & 1 deletion src/reconnect_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct State {
n: Option<BoxedNewPeerFuture>,
cp: ConstructParams,
aux: State2,
reconnect_delay: std::time::Duration,
ratelimiter: Option<tokio_timer::Delay>,
}

/// This implementation's poll is to be reused many times, both after returning item and error
Expand All @@ -73,6 +75,16 @@ impl State {
let aux = &mut self.aux;

loop {
if let Some(delay) = self.ratelimiter.as_mut() {
match delay.poll() {
Ok(Async::Ready(_)) => {
debug!("Waited for reconnect");
self.ratelimiter = None;
}
Err(e) => error!("tokio-timer's Delay: {}", e),
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
let cp = self.cp.clone();
if let Some(ref mut p) = *pp {
return Ok(Async::Ready(p));
Expand All @@ -98,8 +110,13 @@ impl State {

if !aux.already_warned {
aux.already_warned = true;
error!("Reconnecting failed. Trying again in tight endless loop.");
warn!("Reconnecting failed. Further failed reconnects announncements will have lower severity.");
} else {
info!("Reconnecting failed.");
}

self.ratelimiter = Some(tokio_timer::Delay::new(std::time::Instant::now() + self.reconnect_delay));
continue;
}
}
}
Expand Down Expand Up @@ -203,12 +220,15 @@ impl AsyncWrite for PeerHandle {
}

pub fn autoreconnector(s: Rc<dyn Specifier>, cp: ConstructParams) -> BoxedNewPeerFuture {
let reconnect_delay = std::time::Duration::from_millis(cp.program_options.autoreconnect_delay_millis);
let s = Rc::new(RefCell::new(State {
cp,
s,
p: None,
n: None,
aux: Default::default(),
reconnect_delay,
ratelimiter: None,
}));
let ph1 = PeerHandle(s.clone());
let ph2 = PeerHandle(s);
Expand Down

0 comments on commit 682d52b

Please sign in to comment.