Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send TCP protocol header to ignore non-rerun clients #6253

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions crates/re_sdk_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ pub use {buffered_client::Client, tcp_client::ClientError};
mod server;

#[cfg(feature = "server")]
pub use server::{serve, ServerError, ServerOptions};
pub use server::{serve, ConnectionError, ServerError, ServerOptions};

pub const PROTOCOL_VERSION: u16 = 0;
pub const PROTOCOL_VERSION_0: u16 = 0;

/// Added [`PROTOCOL_HEADER`]. Introduced for Rerun 0.16.
pub const PROTOCOL_VERSION_1: u16 = 1;

/// Comes after version.
pub const PROTOCOL_HEADER: &str = "rerun";

pub const DEFAULT_SERVER_PORT: u16 = 9876;

Expand Down
62 changes: 45 additions & 17 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub enum ServerError {
}

#[derive(thiserror::Error, Debug)]
enum VersionError {
pub enum VersionError {
#[error("SDK client is using an older protocol version ({client_version}) than the SDK server ({server_version})")]
ClientIsOlder {
client_version: u16,
Expand All @@ -37,7 +37,10 @@ enum VersionError {
}

#[derive(thiserror::Error, Debug)]
enum ConnectionError {
pub enum ConnectionError {
#[error("An unknown client tried to connect")]
UnknownClient,

#[error(transparent)]
VersionError(#[from] VersionError),

Expand Down Expand Up @@ -190,8 +193,17 @@ fn spawn_client(
return;
}
}
re_log::warn_once!("Closing connection to client at {addr_string}: {err}");
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.to_string().into();

let log_msg = format!("Closing connection to client at {addr_string}: {err}");
if matches!(&err, ConnectionError::UnknownClient) {
// An unknown client that probably stumbled onto the wrong port.
// Don't log as an error (https://github.com/rerun-io/rerun/issues/5883).
re_log::debug!("{log_msg}");
} else {
re_log::warn_once!("{log_msg}");
}

let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
tx.quit(Some(err)).ok(); // best-effort at this point
}
}
Expand All @@ -207,21 +219,37 @@ fn run_client(
stream.read_exact(&mut client_version)?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
std::cmp::Ordering::Less => {
return Err(ConnectionError::VersionError(VersionError::ClientIsOlder {
client_version,
server_version: crate::PROTOCOL_VERSION,
}));
// The server goes into a backward compat mode
// if the client sends version 0
if client_version == crate::PROTOCOL_VERSION_0 {
// Backwards compatibility mode: no protocol header, otherwise the same as version 1.
re_log::warn!("Client is using an old protocol version from before 0.16.");
} else {
// The protocol header was added in version 1
let mut protocol_header = [0_u8; crate::PROTOCOL_HEADER.len()];
stream.read_exact(&mut protocol_header)?;

if std::str::from_utf8(&protocol_header) != Ok(crate::PROTOCOL_HEADER) {
return Err(ConnectionError::UnknownClient);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
return Err(ConnectionError::VersionError(VersionError::ClientIsNewer {
client_version,
server_version: crate::PROTOCOL_VERSION,
}));

let server_version = crate::PROTOCOL_VERSION_1;
match client_version.cmp(&server_version) {
std::cmp::Ordering::Less => {
return Err(ConnectionError::VersionError(VersionError::ClientIsOlder {
client_version,
server_version,
}));
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
return Err(ConnectionError::VersionError(VersionError::ClientIsNewer {
client_version,
server_version,
}));
}
}
}
};

let mut congestion_manager = CongestionManager::new(options.max_latency_sec);

Expand Down
6 changes: 5 additions & 1 deletion crates/re_sdk_comms/src/tcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ impl TcpClient {
match TcpStream::connect_timeout(&self.addr, timeout) {
Ok(mut stream) => {
re_log::debug!("Connected to {:?}.", self.addr);
if let Err(err) = stream.write(&crate::PROTOCOL_VERSION.to_le_bytes()) {

if let Err(err) = stream
.write(&crate::PROTOCOL_VERSION_1.to_le_bytes())
.and_then(|_| stream.write(crate::PROTOCOL_HEADER.as_bytes()))
{
self.stream_state = TcpStreamState::Pending {
start_time,
num_attempts: num_attempts + 1,
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ re_log_types.workspace = true
re_memory.workspace = true
re_query.workspace = true
re_renderer = { workspace = true, default-features = false }
re_sdk_comms.workspace = true
re_smart_channel.workspace = true
re_space_view.workspace = true
re_space_view_bar_chart.workspace = true
Expand Down
18 changes: 17 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,23 @@ impl App {
re_smart_channel::SmartMessagePayload::Msg(msg) => msg,
re_smart_channel::SmartMessagePayload::Quit(err) => {
if let Some(err) = err {
re_log::warn!("Data source {} has left unexpectedly: {err}", msg.source);
let log_msg =
format!("Data source {} has left unexpectedly: {err}", msg.source);

#[cfg(not(target_arch = "wasm32"))]
if err
.downcast_ref::<re_sdk_comms::ConnectionError>()
.is_some_and(|e| {
matches!(e, re_sdk_comms::ConnectionError::UnknownClient)
})
{
// An unknown client that probably stumbled onto the wrong port.
// Don't log as an error (https://github.com/rerun-io/rerun/issues/5883).
re_log::debug!("{log_msg}");
continue;
}

re_log::warn!("{log_msg}");
} else {
re_log::debug!("Data source {} has finished", msg.source);
}
Expand Down