Skip to content

Commit

Permalink
Add logs for when MPSC tokio channels full.
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Jun 9, 2024
1 parent 7444351 commit 2209287
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 17 deletions.
18 changes: 18 additions & 0 deletions rust/suibase/crates/common/src/basic_types/log_safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,21 @@ macro_rules! log_safe {
.await;
};
}

// A macro that check if a MPSC channel has more element queued
// then the threshold. When exceeding, display a message using
// a safe logger.
#[macro_export]
macro_rules! mpsc_q_check {
($param:expr) => {
if $param.len() > $crate::basic_types::MPSC_Q_THRESHOLD {
$crate::basic_types::LOG_SAFE
.info(
&format!("Queue size over threshold: {}", $param.len()),
file!(),
line!(),
)
.await;
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub const EVENT_LEVEL_TRACE: u8 = 5u8;
pub const EVENT_LEVEL_MIN: u8 = EVENT_LEVEL_ERROR;
pub const EVENT_LEVEL_MAX: u8 = EVENT_LEVEL_TRACE;

pub const MPSC_Q_SIZE: usize = 200;
pub const MPSC_Q_THRESHOLD: usize = 150; // Will log an error if queue reaching this level.

/*
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
2 changes: 2 additions & 0 deletions rust/suibase/crates/common/src/workers/shell_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use anyhow::Result;
use tokio_graceful_shutdown::{FutureExt, SubsystemHandle};

use crate::basic_types::{GenericChannelMsg, GenericRx, WorkdirIdx};
use crate::mpsc_q_check;

use home::home_dir;

Expand Down Expand Up @@ -182,6 +183,7 @@ impl ShellWorker {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.event_rx.recv().await {
mpsc_q_check!(self.event_rx);
// Process the message.
self.do_exec(msg).await;
} else {
Expand Down
5 changes: 3 additions & 2 deletions rust/suibase/crates/dtp-daemon/src/admin_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl AdminController {

// Instantiate and start the ShellWorker if not already done.
if wd_tracking.shell_worker_handle.is_none() {
let (shell_worker_tx, shell_worker_rx) = tokio::sync::mpsc::channel(100);
let (shell_worker_tx, shell_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
wd_tracking.shell_worker_tx = Some(shell_worker_tx);
let shell_worker =
ShellWorker::new(self.globals.clone(), shell_worker_rx, Some(workdir_idx));
Expand Down Expand Up @@ -463,7 +463,7 @@ impl AdminController {
// As needed, start a WebSocketWorker for this workdir.
if wd_tracking.websocket_worker_handle.is_none() {
if workdir_config.is_user_request_start() {
let (websocket_worker_tx, websocket_worker_rx) = tokio::sync::mpsc::channel(100);
let (websocket_worker_tx, websocket_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);

let websocket_worker_params = WebSocketWorkerParams::new(
self.globals.clone(),
Expand Down Expand Up @@ -498,6 +498,7 @@ impl AdminController {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.admctrl_rx.recv().await {
common::mpsc_q_check!(self.admctrl_rx);
match msg.event_id {
EVENT_AUDIT => {
self.process_audit_msg(msg).await;
Expand Down
5 changes: 3 additions & 2 deletions rust/suibase/crates/dtp-daemon/src/network_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use crate::shared_types::InputPort;
use common::basic_types::*;
use common::{basic_types::*, mpsc_q_check};

use crate::shared_types::{
GlobalsProxyMT, RequestFailedReason, SendFailedReason, ServerStats, TargetServer,
Expand Down Expand Up @@ -760,6 +760,7 @@ impl NetworkMonitor {
if cur_msg.is_none() {
// Wait for a message.
cur_msg = self.netmon_rx.recv().await;
mpsc_q_check!(self.netmon_rx);
if cur_msg.is_none() || subsys.is_shutdown_requested() {
// Channel closed or shutdown requested.
return;
Expand Down Expand Up @@ -791,7 +792,7 @@ impl NetworkMonitor {
log::info!("started");

// Start another thread to initiate requests toward target servers (e.g. health check)
let (request_worker_tx, request_worker_rx) = tokio::sync::mpsc::channel(1000);
let (request_worker_tx, request_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let request_worker = RequestWorker::new(request_worker_rx);
subsys.start(SubsystemBuilder::new("request-worker", |a| {
request_worker.run(a)
Expand Down
1 change: 1 addition & 0 deletions rust/suibase/crates/dtp-daemon/src/workdirs_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl WorkdirsWatcher {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = local_rx.recv().await {
common::mpsc_q_check!(local_rx);
if msg.need_rescan() {
// TODO Implement rescan of all workdirs (assume events were missed).
log::error!("watch_loop() need_rescan (not implemented!)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl RequestWorker {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.netmon_rx.recv().await {
common::mpsc_q_check!(self.netmon_rx);
// Process the message.
self.do_request(msg).await;
} else {
Expand Down
1 change: 1 addition & 0 deletions rust/suibase/crates/dtp-daemon/src/workers/shell_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl ShellWorker {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.event_rx.recv().await {
common::mpsc_q_check!(self.event_rx);
// Process the message.
self.do_exec(msg).await;
} else {
Expand Down
11 changes: 6 additions & 5 deletions rust/suibase/crates/dtp-daemon/src/workers/websocket_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
};

use common::basic_types::{
self, AutoSizeVecMapVec, AutoThread, GenericChannelMsg, Runnable, WorkdirIdx,
self, AutoSizeVecMapVec, AutoThread, GenericChannelMsg, Runnable, WorkdirIdx, MPSC_Q_SIZE,
};

use anyhow::Result;
Expand Down Expand Up @@ -113,9 +113,9 @@ impl Runnable<WebSocketWorkerParams> for WebSocketThread {

// For now, just start a single instance of each SubThread.

let (worker_io_tx, worker_io_rx) = tokio::sync::mpsc::channel(1000);
let (worker_tx_tx, _worker_tx_rx) = tokio::sync::mpsc::channel(1000);
let (worker_rx_tx, _worker_rx_rx) = tokio::sync::mpsc::channel(1000);
let (worker_io_tx, worker_io_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let (worker_tx_tx, _worker_tx_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let (worker_rx_tx, _worker_rx_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);

// Add a reference to all TX channels into the globals.
{
Expand Down Expand Up @@ -155,7 +155,7 @@ impl Runnable<WebSocketWorkerParams> for WebSocketThread {

// Start a single child db_worker thread.
/* Not applicable for now
let (db_worker_tx, db_worker_rx) = tokio::sync::mpsc::channel(1000);
let (db_worker_tx, db_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let db_worker_params = DBWorkerParams::new(
self.params.globals.clone(),
db_worker_rx,
Expand Down Expand Up @@ -242,6 +242,7 @@ impl WebSocketThread {
while !subsys.is_shutdown_requested() {
// Wait for a suibase internal message (not a websocket message!).
if let Some(msg) = event_rx.recv().await {
common::mpsc_q_check!(event_rx);
match msg {
WebSocketWorkerMsg::Generic(msg) => {
// Process the message.
Expand Down
5 changes: 3 additions & 2 deletions rust/suibase/crates/suibase-daemon/src/admin_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl AdminController {

// Instantiate and start the ShellWorker if not already done.
if wd_tracking.shell_worker_handle.is_none() {
let (shell_worker_tx, shell_worker_rx) = tokio::sync::mpsc::channel(100);
let (shell_worker_tx, shell_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
wd_tracking.shell_worker_tx = Some(shell_worker_tx);
let shell_worker = ShellWorker::new(shell_worker_rx, Some(workdir_idx));
let nested = subsys.start(SubsystemBuilder::new("shell-worker", |a| {
Expand Down Expand Up @@ -464,7 +464,7 @@ impl AdminController {
&& workdir_config.is_user_request_start()
&& wd_tracking.websocket_worker_handle.is_none()
{
let (websocket_worker_tx, websocket_worker_rx) = tokio::sync::mpsc::channel(100);
let (websocket_worker_tx, websocket_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);

let websocket_worker_params = EventsWriterWorkerParams::new(
self.globals.clone(),
Expand All @@ -489,6 +489,7 @@ impl AdminController {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.admctrl_rx.recv().await {
common::mpsc_q_check!(self.admctrl_rx);
match msg.event_id {
EVENT_AUDIT => {
self.process_audit_msg(msg).await;
Expand Down
2 changes: 1 addition & 1 deletion rust/suibase/crates/suibase-daemon/src/network_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ impl NetworkMonitor {
log::info!("started");

// Start another thread to initiate requests toward target servers (e.g. health check)
let (request_worker_tx, request_worker_rx) = tokio::sync::mpsc::channel(1000);
let (request_worker_tx, request_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let request_worker = RequestWorker::new(request_worker_rx);
subsys.start(SubsystemBuilder::new("request-worker", |a| {
request_worker.run(a)
Expand Down
5 changes: 3 additions & 2 deletions rust/suibase/crates/suibase-daemon/src/workdirs_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::basic_types::AutoSizeVec;
use common::basic_types::{AutoSizeVec, MPSC_Q_SIZE};

use anyhow::Result;
use tokio_graceful_shutdown::{FutureExt, SubsystemHandle};
Expand Down Expand Up @@ -167,6 +167,7 @@ impl WorkdirsWatcher {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = local_rx.recv().await {
common::mpsc_q_check!(local_rx);
if msg.need_rescan() {
// TODO Implement rescan of all workdirs (assume events were missed).
log::error!("watch_loop() need_rescan (not implemented!)");
Expand Down Expand Up @@ -262,7 +263,7 @@ impl WorkdirsWatcher {

// Use a local channel to process "raw" events from notify-rs and then watch_loop()
// translate them into higher level messages toward the AdminController.
let (local_tx, local_rx) = tokio::sync::mpsc::channel::<notify::event::Event>(1000);
let (local_tx, local_rx) = tokio::sync::mpsc::channel::<notify::event::Event>(MPSC_Q_SIZE);

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};

use common::basic_types::{
self, AutoThread, GenericChannelMsg, GenericRx, GenericTx, Runnable, WorkdirIdx,
self, AutoThread, GenericChannelMsg, GenericRx, GenericTx, Runnable, WorkdirIdx, MPSC_Q_SIZE,
};

use anyhow::Result;
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Runnable<EventsWriterWorkerParams> for EventsWriterThread {
log::info!("started for {}", self.params.workdir_name);

// Start a child websocket_worker thread (in future, more than one might be started).
let (worker_tx, worker_rx) = tokio::sync::mpsc::channel(1000);
let (worker_tx, worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let ws_worker_params = WebSocketWorkerParams::new(
self.params.globals.clone(),
worker_rx,
Expand All @@ -103,7 +103,7 @@ impl Runnable<EventsWriterWorkerParams> for EventsWriterThread {
self.ws_workers_channel.push(worker_tx);

// Start a single child db_worker thread.
let (db_worker_tx, db_worker_rx) = tokio::sync::mpsc::channel(1000);
let (db_worker_tx, db_worker_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);
let db_worker_params = DBWorkerParams::new(
self.params.globals.clone(),
db_worker_rx,
Expand Down Expand Up @@ -183,6 +183,7 @@ impl EventsWriterThread {
while !subsys.is_shutdown_requested() {
// Wait for a suibase internal message (not a websocket message!).
if let Some(msg) = event_rx.recv().await {
common::mpsc_q_check!(event_rx);
// Process the message.
match msg.event_id {
basic_types::EVENT_AUDIT => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl RequestWorker {
while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = self.netmon_rx.recv().await {
common::mpsc_q_check!(self.netmon_rx);
// Process the message.
self.do_request(msg).await;
} else {
Expand Down

0 comments on commit 2209287

Please sign in to comment.