diff --git a/rust/suibase/crates/common/src/basic_types/suibase_basic_types.rs b/rust/suibase/crates/common/src/basic_types/suibase_basic_types.rs index f0e15b5c..09486265 100644 --- a/rust/suibase/crates/common/src/basic_types/suibase_basic_types.rs +++ b/rust/suibase/crates/common/src/basic_types/suibase_basic_types.rs @@ -1,4 +1,4 @@ -// Some common types depending only on built-in or "standard" types. +// Some common types depending only on built-in or tokio types. pub type EpochTimestamp = tokio::time::Instant; // Event Level (matches consts used in the Suibase log Move module) @@ -111,3 +111,55 @@ impl std::fmt::Debug for GenericChannelMsg { .finish() } } + +pub struct AdminControllerMsg { + // Message sent toward the AdminController from various sources. + pub event_id: AdminControllerEventID, + pub workdir_idx: Option, + pub data_string: Option, + // Channel to send a one-time response. + pub resp_channel: Option>, +} + +impl AdminControllerMsg { + pub fn new() -> Self { + Self { + event_id: 0, + workdir_idx: None, + data_string: None, + resp_channel: None, + } + } + pub fn data_string(&self) -> Option { + self.data_string.clone() + } +} + +impl std::fmt::Debug for AdminControllerMsg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AdminControllerMsg") + .field("event_id", &self.event_id) + .field("data_string", &self.data_string) + .finish() + } +} + +// Events ID +pub type AdminControllerEventID = u8; +pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 128; +pub const EVENT_DEBUG_PRINT: u8 = 129; +pub const EVENT_SHELL_EXEC: u8 = 130; +pub const EVENT_POST_PUBLISH: u8 = 131; + +pub type AdminControllerTx = tokio::sync::mpsc::Sender; +pub type AdminControllerRx = tokio::sync::mpsc::Receiver; + +// For struct that can be instantiated with a single parameter. +pub trait Instantiable

{ + fn new(params: P) -> Self; +} + +// For struct that are guaranteed to have a WorkdirIdx field. +pub trait WorkdirContext { + fn workdir_idx(&self) -> WorkdirIdx; +} diff --git a/rust/suibase/crates/common/src/workers/mod.rs b/rust/suibase/crates/common/src/workers/mod.rs index f5dd204d..b2b087ed 100644 --- a/rust/suibase/crates/common/src/workers/mod.rs +++ b/rust/suibase/crates/common/src/workers/mod.rs @@ -3,6 +3,8 @@ // flatten everything under "common::workders" module. pub use self::shell_worker::*; pub use self::subscription_tracking::*; +pub use self::poller::*; mod shell_worker; mod subscription_tracking; +mod poller; diff --git a/rust/suibase/crates/common/src/workers/poller.rs b/rust/suibase/crates/common/src/workers/poller.rs new file mode 100644 index 00000000..4f055560 --- /dev/null +++ b/rust/suibase/crates/common/src/workers/poller.rs @@ -0,0 +1,274 @@ +// A PollerWorker does: +// - Call into a PollingTrait to do a polling action (e.g. do a CLI command, update globals etc...) +// - Handles AUDIT and UPDATE events. An audit is a *periodic* tentative polling, while +// an UPDATE is an *on-demand* forced polling. +// +// The polling action is done from a tokio task and is auto-restarted on panic! +// +use std::sync::Arc; + +use anyhow::Result; + +use axum::async_trait; + +use tokio::sync::Mutex; +use tokio_graceful_shutdown::{FutureExt, NestedSubsystem, SubsystemBuilder, SubsystemHandle}; + +use crate::{ + basic_types::{ + self, remove_generic_event_dups, AutoThread, GenericChannelMsg, GenericRx, GenericTx, + Instantiable, Runnable, WorkdirContext, WorkdirIdx, MPSC_Q_SIZE, + }, + mpsc_q_check, + shared_types::WORKDIRS_KEYS, +}; + +#[async_trait] +pub trait PollingTrait: Send { + async fn update(&mut self); +} + +#[allow(dead_code)] +// T: A "trait object" implementing PollingTrait +// P: The parameter needed to instantiate T. +pub struct PollerWorker { + params: P, + poller_params: InnerPollerWorkerParams, + poller_worker_handle: Option>>, // Set when the PollerWorker is started. + polling_trait_obj: Arc>, +} + +impl WorkdirContext for PollerWorker +where + T: Instantiable

+ PollingTrait + 'static, + P: WorkdirContext + Clone, +{ + fn workdir_idx(&self) -> WorkdirIdx { + self.params.workdir_idx() + } +} + +impl PollerWorker +where + T: Instantiable

+ PollingTrait + 'static, + P: WorkdirContext + Clone, +{ + pub fn new(params: P, subsys: &SubsystemHandle) -> Self { + let (poller_tx, poller_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE); + + let polling_trait_obj = Arc::new(Mutex::new(T::new(params.clone()))); + + let poller_params = InnerPollerWorkerParams::new( + polling_trait_obj.clone() as Arc>, + poller_rx, + poller_tx.clone(), + params.workdir_idx(), + ); + + let poller_worker = InnerPollerWorker::new(poller_params.clone()); + + let handle = subsys.start(SubsystemBuilder::new( + format!("poller-{}", params.workdir_idx()), + |a| poller_worker.run(a), + )); + + Self { + params: params.clone(), + poller_params, + poller_worker_handle: Some(handle), + polling_trait_obj, + } + } + + pub fn get_tx_channel(&self) -> GenericTx { + self.poller_params.get_tx_channel() + } + + pub fn get_polling_trait_obj(&self) -> Arc> { + self.polling_trait_obj.clone() + } +} + +#[derive(Clone)] +pub struct InnerPollerWorkerParams { + polling_object: Arc>, + event_rx: Arc>, // To receive MSPC messages. + event_tx: GenericTx, // To send messages to self. + workdir_idx: WorkdirIdx, + workdir_name: String, +} + +impl InnerPollerWorkerParams { + pub fn new( + polling_object: Arc>, + event_rx: GenericRx, + event_tx: GenericTx, + workdir_idx: WorkdirIdx, + ) -> Self { + Self { + polling_object, + event_rx: Arc::new(Mutex::new(event_rx)), + event_tx, + workdir_idx, + workdir_name: WORKDIRS_KEYS[workdir_idx as usize].to_string(), + } + } + + pub fn get_tx_channel(&self) -> GenericTx { + self.event_tx.clone() + } +} + +pub struct InnerPollerWorker { + auto_thread: AutoThread, +} + +impl InnerPollerWorker { + pub fn new(params: InnerPollerWorkerParams) -> Self { + Self { + auto_thread: AutoThread::new( + format!("InnerPollerWorker-{}", params.workdir_name), + params, + ), + } + } + + pub async fn run(self, subsys: SubsystemHandle) -> Result<()> { + self.auto_thread.run(subsys).await + } +} + +struct PollerWorkerTask { + task_name: String, + params: InnerPollerWorkerParams, + last_update_timestamp: Option, +} + +#[async_trait] +impl Runnable for PollerWorkerTask { + fn new(task_name: String, params: InnerPollerWorkerParams) -> Self { + Self { + task_name, + params, + last_update_timestamp: None, + } + } + + async fn run(mut self, subsys: SubsystemHandle) -> Result<()> { + match self.event_loop(&subsys).cancel_on_shutdown(&subsys).await { + Ok(()) => { + log::info!("{} normal task exit (2)", self.task_name); + Ok(()) + } + Err(_cancelled_by_shutdown) => { + log::info!("{} normal task exit (1)", self.task_name); + Ok(()) + } + } + } +} + +impl PollerWorkerTask { + async fn process_audit_msg(&mut self, msg: GenericChannelMsg) { + // This function is for periodic tentative update. + if msg.event_id != crate::basic_types::EVENT_AUDIT { + log::error!("Unexpected event_id {:?}", msg); + return; + } + + // Verify that the workdir_idx is as expected. + if let Some(workdir_idx) = msg.workdir_idx { + if workdir_idx != self.params.workdir_idx { + log::error!( + "Unexpected workdir_idx {:?} (expected {:?})", + workdir_idx, + self.params.workdir_idx + ); + return; + } + } else { + log::error!("Missing workdir_idx {:?}", msg); + return; + } + + let force = false; + self.callback_update_in_trait_object(force).await; + } + + async fn process_update_msg(&mut self, msg: GenericChannelMsg) { + // This function is for "on-demand" forced update. + + // Make sure the event_id is EVENT_UPDATE. + if msg.event_id != crate::basic_types::EVENT_UPDATE { + log::error!("Unexpected event_id {:?}", msg); + return; + } + + // Verify that the workdir_idx is as expected. + if let Some(workdir_idx) = msg.workdir_idx { + if workdir_idx != self.params.workdir_idx { + log::error!( + "Unexpected workdir_idx {:?} (expected {:?})", + workdir_idx, + self.params.workdir_idx + ); + return; + } + } else { + log::error!("Unexpected workdir_idx {:?}", msg); + return; + } + + let force = true; + self.callback_update_in_trait_object(force).await; + } + + async fn callback_update_in_trait_object(&mut self, force: bool) { + if !force { + // Debounce excessive update request because the callback will typically + // be "expensive" and involve I/O. + if let Some(last_cli_call_timestamp) = self.last_update_timestamp { + if last_cli_call_timestamp.elapsed() < tokio::time::Duration::from_millis(50) { + return; + } + }; + } + self.last_update_timestamp = Some(tokio::time::Instant::now()); + + self.params.polling_object.lock().await.update().await; + } + + async fn event_loop(&mut self, subsys: &SubsystemHandle) { + // Take mutable ownership of the event_rx channel as long this task is running. + let event_rx = Arc::clone(&self.params.event_rx); + let mut event_rx = event_rx.lock().await; + + // Remove duplicate of EVENT_AUDIT and EVENT_UPDATE in the MPSC queue. + // (handle the case where the task was auto-restarted). + remove_generic_event_dups(&mut event_rx, &self.params.event_tx); + mpsc_q_check!(event_rx); // Just to help verify if the Q unexpectedly "accumulate". + + while !subsys.is_shutdown_requested() { + // Wait for a message. + if let Some(msg) = event_rx.recv().await { + mpsc_q_check!(event_rx); + match msg.event_id { + basic_types::EVENT_AUDIT => { + // Periodic processing. + self.process_audit_msg(msg).await; + } + basic_types::EVENT_UPDATE => { + // On-demand/reactive processing. + self.process_update_msg(msg).await; + } + _ => { + log::error!("Unexpected event_id {:?}", msg); + } + } + } else { + // Channel closed or shutdown requested. + return; + } + } + } +} diff --git a/rust/suibase/crates/dtp-daemon/src/admin_controller.rs b/rust/suibase/crates/dtp-daemon/src/admin_controller.rs index 1fa510a6..af2149d8 100644 --- a/rust/suibase/crates/dtp-daemon/src/admin_controller.rs +++ b/rust/suibase/crates/dtp-daemon/src/admin_controller.rs @@ -34,8 +34,6 @@ pub struct AdminController { port_tracking: AutoSizeVec, } -pub type AdminControllerTx = tokio::sync::mpsc::Sender; -pub type AdminControllerRx = tokio::sync::mpsc::Receiver; #[derive(Default)] struct WorkdirTracking { @@ -78,45 +76,6 @@ impl std::fmt::Debug for InputPortTracking { } } -pub struct AdminControllerMsg { - // Message sent toward the AdminController from various sources. - pub event_id: AdminControllerEventID, - pub workdir_idx: Option, - pub data_string: Option, - // Channel to send a one-time response. - pub resp_channel: Option>, -} - -impl AdminControllerMsg { - pub fn new() -> Self { - Self { - event_id: 0, - workdir_idx: None, - data_string: None, - resp_channel: None, - } - } - pub fn data_string(&self) -> Option { - self.data_string.clone() - } -} - -impl std::fmt::Debug for AdminControllerMsg { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AdminControllerMsg") - .field("event_id", &self.event_id) - .field("data_string", &self.data_string) - .finish() - } -} - -// Events ID -pub type AdminControllerEventID = u8; -pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 128; -pub const EVENT_DEBUG_PRINT: u8 = 129; -pub const EVENT_SHELL_EXEC: u8 = 130; -pub const EVENT_POST_PUBLISH: u8 = 131; - impl AdminController { pub fn new( globals: Globals, diff --git a/rust/suibase/crates/dtp-daemon/src/api/api_server.rs b/rust/suibase/crates/dtp-daemon/src/api/api_server.rs index 8f2d6eb9..aeab0e63 100644 --- a/rust/suibase/crates/dtp-daemon/src/api/api_server.rs +++ b/rust/suibase/crates/dtp-daemon/src/api/api_server.rs @@ -12,10 +12,10 @@ use axum::async_trait; use anyhow::Result; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::{admin_controller::AdminControllerTx, shared_types::Globals}; +use crate::shared_types::Globals; use common::{ - basic_types::{AutoThread, Runnable}, + basic_types::{AdminControllerTx, AutoThread, Runnable}, log_safe, }; diff --git a/rust/suibase/crates/dtp-daemon/src/api/impl_dtp_api.rs b/rust/suibase/crates/dtp-daemon/src/api/impl_dtp_api.rs index 8d7d7e0a..86a7f29e 100644 --- a/rust/suibase/crates/dtp-daemon/src/api/impl_dtp_api.rs +++ b/rust/suibase/crates/dtp-daemon/src/api/impl_dtp_api.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::bail; use axum::async_trait; -use common::basic_types::{GenericChannelMsg, ManagedVecU16, WorkdirIdx}; +use common::basic_types::{AdminControllerMsg, AdminControllerTx, GenericChannelMsg, ManagedVecU16, WorkdirIdx}; use dtp_sdk::{Connection, DTP}; use jsonrpsee::core::RpcResult; @@ -11,9 +11,6 @@ use log::info; use tokio::sync::Mutex; -use crate::admin_controller::{ - AdminControllerMsg, AdminControllerTx, EVENT_NOTIF_CONFIG_FILE_CHANGE, -}; use crate::shared_types::{ DTPConnStateDataClient, DTPConnStateDataServer, ExtendedWebSocketWorkerIOMsg, Globals, WebSocketWorkerIOMsg, @@ -189,7 +186,7 @@ impl DtpApiServer for DtpApiImpl { // Inform the AdminController that something changed... let mut msg = AdminControllerMsg::new(); - msg.event_id = EVENT_NOTIF_CONFIG_FILE_CHANGE; + msg.event_id = common::basic_types::EVENT_NOTIF_CONFIG_FILE_CHANGE; msg.data_string = Some(path); // TODO: Implement response to handle errors... but is it really needed here? diff --git a/rust/suibase/crates/dtp-daemon/src/api/impl_general_api.rs b/rust/suibase/crates/dtp-daemon/src/api/impl_general_api.rs index 1c6b09d0..55502593 100644 --- a/rust/suibase/crates/dtp-daemon/src/api/impl_general_api.rs +++ b/rust/suibase/crates/dtp-daemon/src/api/impl_general_api.rs @@ -4,9 +4,8 @@ use anyhow::Result; use jsonrpsee::core::RpcResult; -use common::basic_types::WorkdirIdx; +use common::basic_types::{AdminControllerMsg, AdminControllerTx, WorkdirIdx}; -use crate::admin_controller::{AdminControllerMsg, AdminControllerTx, EVENT_SHELL_EXEC}; use crate::shared_types::Globals; use super::{ @@ -31,7 +30,7 @@ impl GeneralApiImpl { async fn shell_exec(&self, workdir_idx: WorkdirIdx, cmd: String) -> Result { let mut msg = AdminControllerMsg::new(); - msg.event_id = EVENT_SHELL_EXEC; + msg.event_id = common::basic_types::EVENT_SHELL_EXEC; let (tx, rx) = tokio::sync::oneshot::channel(); msg.resp_channel = Some(tx); msg.workdir_idx = Some(workdir_idx); diff --git a/rust/suibase/crates/dtp-daemon/src/api/impl_packages_api.rs b/rust/suibase/crates/dtp-daemon/src/api/impl_packages_api.rs index cd85c0ff..559f5f4d 100644 --- a/rust/suibase/crates/dtp-daemon/src/api/impl_packages_api.rs +++ b/rust/suibase/crates/dtp-daemon/src/api/impl_packages_api.rs @@ -2,12 +2,13 @@ use std::time::SystemTime; use axum::async_trait; +use common::basic_types::AdminControllerTx; use jsonrpsee::core::RpcResult; use jsonrpsee_types::ErrorObjectOwned as RpcError; use chrono::Utc; -use crate::admin_controller::{AdminController, AdminControllerTx}; +use crate::admin_controller::AdminController; use crate::api::RpcSuibaseError; use crate::shared_types::{Globals, GlobalsPackagesConfigST}; diff --git a/rust/suibase/crates/dtp-daemon/src/clock_trigger.rs b/rust/suibase/crates/dtp-daemon/src/clock_trigger.rs index cb389e33..f50b6ed0 100644 --- a/rust/suibase/crates/dtp-daemon/src/clock_trigger.rs +++ b/rust/suibase/crates/dtp-daemon/src/clock_trigger.rs @@ -3,12 +3,9 @@ use anyhow::Result; use axum::async_trait; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::{ - admin_controller::{AdminControllerMsg, AdminControllerTx}, - network_monitor::{NetMonTx, NetworkMonitor}, -}; +use crate::network_monitor::{NetMonTx, NetworkMonitor}; -use common::basic_types::{self, AutoThread, Runnable}; +use common::basic_types::{self, AdminControllerMsg, AdminControllerTx, AutoThread, Runnable}; use tokio::time::{interval, Duration}; diff --git a/rust/suibase/crates/dtp-daemon/src/workdirs_watcher.rs b/rust/suibase/crates/dtp-daemon/src/workdirs_watcher.rs index f2308ebd..80f2f595 100644 --- a/rust/suibase/crates/dtp-daemon/src/workdirs_watcher.rs +++ b/rust/suibase/crates/dtp-daemon/src/workdirs_watcher.rs @@ -1,11 +1,8 @@ -use common::basic_types::AutoSizeVec; +use common::basic_types::{AdminControllerMsg, AdminControllerTx, AutoSizeVec}; use anyhow::Result; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::admin_controller::{ - AdminControllerMsg, AdminControllerTx, EVENT_NOTIF_CONFIG_FILE_CHANGE, -}; use crate::shared_types::GlobalsWorkdirsMT; use common::shared_types::Workdir; @@ -37,7 +34,7 @@ impl WorkdirsWatcher { async fn send_notif_config_file_change(&self, path: String) { log::info!("Sending notif {}", path); let mut msg = AdminControllerMsg::new(); - msg.event_id = EVENT_NOTIF_CONFIG_FILE_CHANGE; + msg.event_id = common::basic_types::EVENT_NOTIF_CONFIG_FILE_CHANGE; msg.data_string = Some(path); let _ = self.admctrl_tx.send(msg).await.map_err(|e| { log::debug!("failed {}", e); diff --git a/rust/suibase/crates/suibase-daemon/src/admin_controller.rs b/rust/suibase/crates/suibase-daemon/src/admin_controller.rs index 8f7555bb..ff7067b4 100644 --- a/rust/suibase/crates/suibase-daemon/src/admin_controller.rs +++ b/rust/suibase/crates/suibase-daemon/src/admin_controller.rs @@ -9,8 +9,8 @@ use crate::proxy_server::ProxyServer; use crate::shared_types::{Globals, InputPort, WorkdirUserConfig, WORKDIR_IDX_LOCALNET}; use crate::workdirs_watcher::WorkdirsWatcher; use crate::workers::{ - CliPollerParams, CliPollerWorker, EventsWriterWorker, EventsWriterWorkerParams, - PackagesPollerParams, PackagesPollerWorker, + CliPoller, CliPollerParams, EventsWriterWorker, EventsWriterWorkerParams, PackagesPoller, + PackagesPollerParams, }; use common::workers::ShellWorker; @@ -49,9 +49,6 @@ pub struct AdminController { port_tracking: AutoSizeVec, } -pub type AdminControllerTx = tokio::sync::mpsc::Sender; -pub type AdminControllerRx = tokio::sync::mpsc::Receiver; - #[derive(Default)] struct WorkdirTracking { last_read_config: Option, @@ -62,11 +59,9 @@ struct WorkdirTracking { events_worker_tx: Option, events_worker_handle: Option>>, // Set when the EventsWriterWorker is started. - cli_poller_tx: Option, - cli_poller_handle: Option>>, // Set when the CliPollerWorker is started. + cli_poller: Option, - packages_poller_tx: Option, - packages_poller_handle: Option>>, // Set when the PackagesPollerWorker is started. + packages_poller: Option, process_watchdog_last_check_timestamp: Option, process_watchdog_last_recovery_timestamp: Option, @@ -96,45 +91,6 @@ impl std::fmt::Debug for InputPortTracking { } } -pub struct AdminControllerMsg { - // Message sent toward the AdminController from various sources. - pub event_id: AdminControllerEventID, - pub workdir_idx: Option, - pub data_string: Option, - // Channel to send a one-time response. - pub resp_channel: Option>, -} - -impl AdminControllerMsg { - pub fn new() -> Self { - Self { - event_id: 0, - workdir_idx: None, - data_string: None, - resp_channel: None, - } - } - pub fn data_string(&self) -> Option { - self.data_string.clone() - } -} - -impl std::fmt::Debug for AdminControllerMsg { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AdminControllerMsg") - .field("event_id", &self.event_id) - .field("data_string", &self.data_string) - .finish() - } -} - -// Events ID -pub type AdminControllerEventID = u8; -pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 128; -pub const EVENT_DEBUG_PRINT: u8 = 129; -pub const EVENT_SHELL_EXEC: u8 = 130; -pub const EVENT_POST_PUBLISH: u8 = 131; - impl AdminController { pub fn new( globals: Globals, @@ -238,29 +194,8 @@ impl AdminController { } } - if let Some(worker_tx) = wd_tracking.cli_poller_tx.as_ref() { - match worker_tx.try_send(worker_msg.clone()) { - Ok(()) => {} - Err(e) => { - log_safe!(format!( - "try_send EVENT_AUDIT forward to cli poller failed: {}", - e - )); - } - } - } - - if let Some(worker_tx) = wd_tracking.packages_poller_tx.as_ref() { - match worker_tx.try_send(worker_msg.clone()) { - Ok(()) => {} - Err(e) => { - log_safe!(format!( - "try_send EVENT_AUDIT forward to packages poller failed: {}", - e - )); - } - } - } + Self::send_msg_to_cli_poller(wd_tracking, worker_msg.clone()).await; + Self::send_msg_to_packages_poller(wd_tracking, worker_msg.clone()).await; } // Check for potential need for local process restart/recovery. @@ -268,14 +203,15 @@ impl AdminController { } async fn send_msg_to_cli_poller(wd_tracking: &WorkdirTracking, msg: GenericChannelMsg) { - if let Some(worker_tx) = wd_tracking.cli_poller_tx.as_ref() { + if let Some(poller) = wd_tracking.cli_poller.as_ref() { let workdir_idx = msg.workdir_idx; - match worker_tx.try_send(msg) { + let event_id = msg.event_id; + match poller.get_tx_channel().try_send(msg) { Ok(()) => {} Err(e) => { log_safe!(format!( - "try_send to {:?} cli poller failed: {}", - workdir_idx, e + "try_send event id={:?} to {:?} cli poller failed: {}", + event_id, workdir_idx, e )); } } @@ -283,14 +219,15 @@ impl AdminController { } async fn send_msg_to_packages_poller(wd_tracking: &WorkdirTracking, msg: GenericChannelMsg) { - if let Some(worker_tx) = wd_tracking.packages_poller_tx.as_ref() { + if let Some(poller) = wd_tracking.packages_poller.as_ref() { let workdir_idx = msg.workdir_idx; - match worker_tx.try_send(msg) { + let event_id = msg.event_id; + match poller.get_tx_channel().try_send(msg) { Ok(()) => {} Err(e) => { log_safe!(format!( - "try_send to {:?} packages poller failed: {}", - workdir_idx, e + "try_send event id={:?} to {:?} packages poller failed: {}", + event_id, workdir_idx, e )); } } @@ -809,47 +746,28 @@ impl AdminController { } // Start a CLI poller. - if wd_tracking.cli_poller_handle.is_none() { - let (poller_tx, poller_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE); + if wd_tracking.cli_poller.is_none() { let params = CliPollerParams::new( self.globals.clone(), - poller_rx, - poller_tx.clone(), self.admctrl_tx.clone(), workdir_idx, ); - wd_tracking.cli_poller_tx = Some(poller_tx); - - let poller = CliPollerWorker::new(params); - let nested = subsys.start(SubsystemBuilder::new( - format!("cli-poller-{}", workdir_idx), - |a| poller.run(a), - )); - wd_tracking.cli_poller_handle = Some(nested); + + let poller = CliPoller::new(params, &subsys); + + wd_tracking.cli_poller = Some(poller); } // Start a packages poller. - if wd_tracking.packages_poller_handle.is_none() { - let (poller_tx, poller_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE); - + if wd_tracking.packages_poller.is_none() { let params = PackagesPollerParams::new( self.globals.clone(), - poller_rx, - poller_tx.clone(), - self.admctrl_tx.clone(), wd_tracking.events_worker_tx.clone(), workdir_idx, ); - wd_tracking.packages_poller_tx = Some(poller_tx); - - let poller = PackagesPollerWorker::new(params); - let nested = subsys.start(SubsystemBuilder::new( - format!("packages-poller-{}", workdir_idx), - |a| poller.run(a), - )); - wd_tracking.packages_poller_handle = Some(nested); - } else { - log::error!("Missing events_worker_tx for packages poller"); + + let poller = PackagesPoller::new(params, &subsys); + wd_tracking.packages_poller = Some(poller); } } diff --git a/rust/suibase/crates/suibase-daemon/src/api/api_server.rs b/rust/suibase/crates/suibase-daemon/src/api/api_server.rs index 0b8bb6ec..3bac2618 100644 --- a/rust/suibase/crates/suibase-daemon/src/api/api_server.rs +++ b/rust/suibase/crates/suibase-daemon/src/api/api_server.rs @@ -13,10 +13,10 @@ use axum::async_trait; use anyhow::Result; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::{admin_controller::AdminControllerTx, shared_types::Globals}; +use crate::shared_types::Globals; use common::{ - basic_types::{AutoThread, Runnable}, + basic_types::{AdminControllerTx, AutoThread, Runnable}, log_safe, }; @@ -76,7 +76,7 @@ impl Runnable for APIServerThread { } async fn run(self, subsys: SubsystemHandle) -> Result<()> { - log_safe!(format!("{} started", self.name )); + log_safe!(format!("{} started", self.name)); match self.event_loop(&subsys).cancel_on_shutdown(&subsys).await { Ok(_) => { diff --git a/rust/suibase/crates/suibase-daemon/src/api/impl_general_api.rs b/rust/suibase/crates/suibase-daemon/src/api/impl_general_api.rs index 779bee40..c3d8354a 100644 --- a/rust/suibase/crates/suibase-daemon/src/api/impl_general_api.rs +++ b/rust/suibase/crates/suibase-daemon/src/api/impl_general_api.rs @@ -1,8 +1,9 @@ use axum::async_trait; +use common::basic_types::AdminControllerTx; use jsonrpsee::core::RpcResult; -use crate::admin_controller::{AdminController, AdminControllerTx}; +use crate::admin_controller::AdminController; use crate::shared_types::{Globals, GlobalsWorkdirsST}; use super::{ diff --git a/rust/suibase/crates/suibase-daemon/src/api/impl_packages_api.rs b/rust/suibase/crates/suibase-daemon/src/api/impl_packages_api.rs index eb2167ed..2d6e25c8 100644 --- a/rust/suibase/crates/suibase-daemon/src/api/impl_packages_api.rs +++ b/rust/suibase/crates/suibase-daemon/src/api/impl_packages_api.rs @@ -4,13 +4,14 @@ use axum::async_trait; use anyhow::Result; +use common::basic_types::AdminControllerTx; use common::log_safe; use jsonrpsee::core::RpcResult; use jsonrpsee_types::ErrorObjectOwned as RpcError; use chrono::Utc; -use crate::admin_controller::{AdminController, AdminControllerTx}; +use crate::admin_controller::AdminController; use crate::api::RpcSuibaseError; use crate::shared_types::{Globals, GlobalsWorkdirsST}; diff --git a/rust/suibase/crates/suibase-daemon/src/api/impl_proxy_api.rs b/rust/suibase/crates/suibase-daemon/src/api/impl_proxy_api.rs index 19e852f5..66954a24 100644 --- a/rust/suibase/crates/suibase-daemon/src/api/impl_proxy_api.rs +++ b/rust/suibase/crates/suibase-daemon/src/api/impl_proxy_api.rs @@ -4,11 +4,8 @@ use axum::async_trait; use jsonrpsee::core::RpcResult; -use crate::admin_controller::{ - AdminControllerMsg, AdminControllerTx, EVENT_NOTIF_CONFIG_FILE_CHANGE, -}; use crate::shared_types::{GlobalsProxyMT, ServerStats}; -use common::basic_types::{SafeUuid, TargetServerIdx}; +use common::basic_types::{AdminControllerMsg, AdminControllerTx, SafeUuid, TargetServerIdx}; use super::{InfoResponse, ProxyApiServer, VersionedEq}; use super::{LinkStats, LinksResponse, LinksSummary, RpcInputError}; @@ -497,7 +494,7 @@ impl ProxyApiServer for ProxyApiImpl { if debug { // Communicate with AdminController to append its own debug state. let mut msg = AdminControllerMsg::new(); - msg.event_id = crate::admin_controller::EVENT_DEBUG_PRINT; + msg.event_id = common::basic_types::EVENT_DEBUG_PRINT; let (tx, rx) = tokio::sync::oneshot::channel(); msg.resp_channel = Some(tx); if (self.admctrl_tx.send(msg).await).is_ok() { @@ -540,7 +537,7 @@ impl ProxyApiServer for ProxyApiImpl { // Inform the AdminController that something changed... let mut msg = AdminControllerMsg::new(); - msg.event_id = EVENT_NOTIF_CONFIG_FILE_CHANGE; + msg.event_id = common::basic_types::EVENT_NOTIF_CONFIG_FILE_CHANGE; msg.data_string = Some(path); // TODO: Implement response to handle errors... but is it really needed here? diff --git a/rust/suibase/crates/suibase-daemon/src/clock_trigger.rs b/rust/suibase/crates/suibase-daemon/src/clock_trigger.rs index 880ccdeb..a47e23a4 100644 --- a/rust/suibase/crates/suibase-daemon/src/clock_trigger.rs +++ b/rust/suibase/crates/suibase-daemon/src/clock_trigger.rs @@ -3,12 +3,9 @@ use anyhow::Result; use axum::async_trait; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::{ - admin_controller::{AdminControllerMsg, AdminControllerTx}, - network_monitor::{NetMonTx, NetworkMonitor}, -}; +use crate::network_monitor::{NetMonTx, NetworkMonitor}; -use common::basic_types::{self, AutoThread, Runnable}; +use common::basic_types::{self, AdminControllerMsg, AdminControllerTx, AutoThread, Runnable}; use tokio::time::{interval, Duration}; diff --git a/rust/suibase/crates/suibase-daemon/src/workdirs_watcher.rs b/rust/suibase/crates/suibase-daemon/src/workdirs_watcher.rs index 4a8d3776..6726ad2e 100644 --- a/rust/suibase/crates/suibase-daemon/src/workdirs_watcher.rs +++ b/rust/suibase/crates/suibase-daemon/src/workdirs_watcher.rs @@ -1,11 +1,8 @@ -use common::basic_types::{AutoSizeVec, MPSC_Q_SIZE}; +use common::basic_types::{AdminControllerMsg, AdminControllerTx, AutoSizeVec, MPSC_Q_SIZE}; use anyhow::Result; use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; -use crate::admin_controller::{ - AdminControllerMsg, AdminControllerTx, EVENT_NOTIF_CONFIG_FILE_CHANGE, -}; use crate::shared_types::{GlobalsWorkdirsMT, Workdir}; use notify::RecursiveMode; @@ -36,7 +33,7 @@ impl WorkdirsWatcher { async fn send_notif_config_file_change(&self, path: String) { log::info!("Sending notif {}", path); let mut msg = AdminControllerMsg::new(); - msg.event_id = EVENT_NOTIF_CONFIG_FILE_CHANGE; + msg.event_id = common::basic_types::EVENT_NOTIF_CONFIG_FILE_CHANGE; msg.data_string = Some(path); let _ = self.admctrl_tx.send(msg).await.map_err(|e| { log::debug!("failed {}", e); diff --git a/rust/suibase/crates/suibase-daemon/src/workers/cli_poller.rs b/rust/suibase/crates/suibase-daemon/src/workers/cli_poller.rs index df2e3d0e..fedbf5b2 100644 --- a/rust/suibase/crates/suibase-daemon/src/workers/cli_poller.rs +++ b/rust/suibase/crates/suibase-daemon/src/workers/cli_poller.rs @@ -1,203 +1,94 @@ -// Child thread of admin_controller +// Child task of admin_controller // // One instance per workdir. // // Responsible to: // - Periodically and on-demand do "status" CLI commands and update globals. // -// The thread is auto-restart in case of panic. - -use std::sync::Arc; +// The task is auto-restart in case of panic. +// +// Design: +// - Define a PollingTraitObject that does the "specialize" polling. +// - Uses a PollerWorker for most of background task/event re-useable logic. +// use crate::{ - admin_controller::{AdminController, AdminControllerTx}, + admin_controller::AdminController, api::{StatusService, Versioned, WorkdirStatusResponse}, shared_types::{Globals, WORKDIRS_KEYS}, }; -use anyhow::Result; - use axum::async_trait; use common::{ - basic_types::{ - self, AutoThread, GenericChannelMsg, GenericRx, GenericTx, Runnable, WorkdirIdx, - }, - mpsc_q_check, + basic_types::{AdminControllerTx, GenericTx, Instantiable, WorkdirContext, WorkdirIdx}, + workers::PollerWorker, }; -use tokio::sync::Mutex; -use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; +use common::workers::PollingTrait; -use common::basic_types::remove_generic_event_dups; +use tokio_graceful_shutdown::SubsystemHandle; #[derive(Clone)] pub struct CliPollerParams { globals: Globals, - event_rx: Arc>, // To receive MSPC messages. - event_tx: GenericTx, // To send messages to self. - admctrl_tx: AdminControllerTx, // To send messages to parent + admctrl_tx: AdminControllerTx, // For exec shell messages workdir_idx: WorkdirIdx, - workdir_name: String, +} + +impl WorkdirContext for CliPollerParams { + fn workdir_idx(&self) -> WorkdirIdx { + self.workdir_idx + } } impl CliPollerParams { - pub fn new( - globals: Globals, - event_rx: GenericRx, - event_tx: GenericTx, - admctrl_tx: AdminControllerTx, - workdir_idx: WorkdirIdx, - ) -> Self { + pub fn new(globals: Globals, admctrl_tx: AdminControllerTx, workdir_idx: WorkdirIdx) -> Self { Self { globals, - event_rx: Arc::new(Mutex::new(event_rx)), - event_tx, admctrl_tx, workdir_idx, - workdir_name: WORKDIRS_KEYS[workdir_idx as usize].to_string(), } } } -pub struct CliPollerWorker { - auto_thread: AutoThread, +pub struct CliPoller { + // "Glue" the specialized PollingTraitObject with its parameters. + // The worker does all the background task/events handling. + poller: PollerWorker, } -impl CliPollerWorker { - pub fn new(params: CliPollerParams) -> Self { - Self { - auto_thread: AutoThread::new(format!("CliPollerWorker-{}", params.workdir_idx), params), - } - } - - pub async fn run(self, subsys: SubsystemHandle) -> Result<()> { - self.auto_thread.run(subsys).await - } -} - -struct CliPollerWorkerTask { - task_name: String, +pub struct PollingTraitObject { params: CliPollerParams, - last_update_timestamp: Option, } #[async_trait] -impl Runnable for CliPollerWorkerTask { - fn new(task_name: String, params: CliPollerParams) -> Self { - Self { - task_name, - params, - last_update_timestamp: None, - } - } - - async fn run(mut self, subsys: SubsystemHandle) -> Result<()> { - match self.event_loop(&subsys).cancel_on_shutdown(&subsys).await { - Ok(()) => { - log::info!("{} normal thread exit (2)", self.task_name); - Ok(()) - } - Err(_cancelled_by_shutdown) => { - log::info!("{} normal thread exit (1)", self.task_name); - Ok(()) - } - } +impl PollingTrait for PollingTraitObject { + // This is called by the PollerWorker task. + async fn update(&mut self) { + self.update_globals_workdir_status().await; } } -impl CliPollerWorkerTask { - async fn process_audit_msg(&mut self, msg: GenericChannelMsg) { - // This function takes care of periodic operation synchronizing - // between the CLI state and the globals. - - if msg.event_id != basic_types::EVENT_AUDIT { - log::error!("Unexpected event_id {:?}", msg); - return; - } - - // Verify that the workdir_idx is as expected. - if let Some(workdir_idx) = msg.workdir_idx { - if workdir_idx != self.params.workdir_idx { - log::error!( - "Unexpected workdir_idx {:?} (expected {:?})", - workdir_idx, - self.params.workdir_idx - ); - return; - } - } else { - log::error!("Missing workdir_idx {:?}", msg); - return; - } - - // Simply convert the periodic audit into an update, but do - // not force it. - let force = false; - self.update_globals_workdir_status(force).await; +// This allow the PollerWorker to instantiate the PollingTraitObject. +impl Instantiable for PollingTraitObject { + fn new(params: CliPollerParams) -> Self { + Self { params } } +} - async fn process_update_msg(&mut self, msg: GenericChannelMsg) { - // This function takes care of synching from Suibase CLI to the globals. - - // Make sure the event_id is EVENT_UPDATE. - if msg.event_id != basic_types::EVENT_UPDATE { - log::error!("Unexpected event_id {:?}", msg); - return; - } - - // Verify that the workdir_idx is as expected. - if let Some(workdir_idx) = msg.workdir_idx { - if workdir_idx != self.params.workdir_idx { - log::error!( - "Unexpected workdir_idx {:?} (expected {:?})", - workdir_idx, - self.params.workdir_idx - ); - return; - } - } else { - log::error!("Unexpected workdir_idx {:?}", msg); - return; - } - - let force = true; - self.update_globals_workdir_status(force).await; +impl CliPoller { + pub fn new(params: CliPollerParams, subsys: &SubsystemHandle) -> Self { + let poller = + PollerWorker::::new(params.clone(), subsys); + Self { poller } } - async fn event_loop(&mut self, subsys: &SubsystemHandle) { - // Take mutable ownership of the event_rx channel as long this thread is running. - let event_rx = Arc::clone(&self.params.event_rx); - let mut event_rx = event_rx.lock().await; - - // Remove duplicate of EVENT_AUDIT and EVENT_UPDATE in the event_rx queue. - // (handle the case where the task was auto-restarted). - remove_generic_event_dups(&mut event_rx, &self.params.event_tx); - mpsc_q_check!(event_rx); // Just to help verify if the Q unexpectedly "accumulate". - - while !subsys.is_shutdown_requested() { - // Wait for a message. - if let Some(msg) = event_rx.recv().await { - common::mpsc_q_check!(event_rx); - match msg.event_id { - basic_types::EVENT_AUDIT => { - // Periodic processing. - self.process_audit_msg(msg).await; - } - basic_types::EVENT_UPDATE => { - // On-demand/reactive processing. - self.process_update_msg(msg).await; - } - _ => { - log::error!("Unexpected event_id {:?}", msg); - } - } - } else { - // Channel closed or shutdown requested. - return; - } - } + pub fn get_tx_channel(&self) -> GenericTx { + self.poller.get_tx_channel() } +} +impl PollingTraitObject { fn convert_status_cmd_resp_to_status_response( cmd_response: String, workdir_name: String, @@ -396,19 +287,9 @@ impl CliPollerWorkerTask { (first_line_parsed, asui_selection) } - async fn update_globals_workdir_status(&mut self, force: bool) { - if !force { - // Debounce excessive refresh request on short period of time. - if let Some(last_cli_call_timestamp) = self.last_update_timestamp { - if last_cli_call_timestamp.elapsed() < tokio::time::Duration::from_millis(50) { - return; - } - }; - } - self.last_update_timestamp = Some(tokio::time::Instant::now()); - - let workdir = &self.params.workdir_name; + async fn update_globals_workdir_status(&mut self) { let workdir_idx = self.params.workdir_idx; + let workdir = WORKDIRS_KEYS[workdir_idx as usize].to_string(); // Try to refresh the globals and return the latest UUID. let mut resp = WorkdirStatusResponse::new(); diff --git a/rust/suibase/crates/suibase-daemon/src/workers/packages_poller.rs b/rust/suibase/crates/suibase-daemon/src/workers/packages_poller.rs index 4e1bfcfe..8e1bbeab 100644 --- a/rust/suibase/crates/suibase-daemon/src/workers/packages_poller.rs +++ b/rust/suibase/crates/suibase-daemon/src/workers/packages_poller.rs @@ -1,4 +1,4 @@ -// Child thread of admin_controller +// Child task of admin_controller // // One instance per workdir. // @@ -6,209 +6,100 @@ // - Periodically and on-demand check published packages // under ~/suibase/workdirs and update globals. // -// The thread is auto-restart in case of panic. - -use std::sync::Arc; +// The task is auto-restart in case of panic. use anyhow::anyhow; -use common::{basic_types::WorkdirIdx, log_safe}; +use common::{ + basic_types::{Instantiable, WorkdirContext, WorkdirIdx}, + log_safe, + shared_types::WORKDIRS_KEYS, + workers::{PollerWorker, PollingTrait}, +}; use std::collections::HashSet; use std::path::PathBuf; use crate::{ - admin_controller::AdminControllerTx, api::{PackageInstance, SuiObjectInstance, SuiObjectType}, - shared_types::{Globals, PackagePath, WORKDIRS_KEYS}, + shared_types::{Globals, PackagePath}, }; use anyhow::Result; use axum::async_trait; -use common::{ - basic_types::{self, AutoThread, GenericChannelMsg, GenericRx, GenericTx, Runnable}, - mpsc_q_check, -}; - -use tokio::sync::Mutex; -use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; +use common::basic_types::{self, GenericChannelMsg, GenericTx}; -use common::basic_types::remove_generic_event_dups; +use tokio_graceful_shutdown::SubsystemHandle; #[derive(Clone)] pub struct PackagesPollerParams { globals: Globals, - event_rx: Arc>, // To receive MSPC messages. - event_tx: GenericTx, // To send messages to self. - admctrl_tx: AdminControllerTx, // To send messages to parent sui_events_worker_tx: Option, // To send messages to related Sui event worker. workdir_idx: WorkdirIdx, - workdir_name: String, +} + +impl WorkdirContext for PackagesPollerParams { + fn workdir_idx(&self) -> WorkdirIdx { + self.workdir_idx + } } impl PackagesPollerParams { pub fn new( globals: Globals, - event_rx: GenericRx, - event_tx: GenericTx, - admctrl_tx: AdminControllerTx, sui_events_worker_tx: Option, workdir_idx: WorkdirIdx, ) -> Self { Self { globals, - event_rx: Arc::new(Mutex::new(event_rx)), - event_tx, - admctrl_tx, sui_events_worker_tx, workdir_idx, - workdir_name: WORKDIRS_KEYS[workdir_idx as usize].to_string(), } } } -pub struct PackagesPollerWorker { - auto_thread: AutoThread, -} - -impl PackagesPollerWorker { - pub fn new(params: PackagesPollerParams) -> Self { - Self { - auto_thread: AutoThread::new( - format!("PackagesPollerWorker-{}", params.workdir_idx), - params, - ), - } - } - - pub async fn run(self, subsys: SubsystemHandle) -> Result<()> { - self.auto_thread.run(subsys).await - } +pub struct PackagesPoller { + // "Glue" the specialized PollingTraitObject with its parameters. + // The worker does all the background task/events handling. + poller: PollerWorker, } -struct PackagesPollerWorkerTask { - task_name: String, +pub struct PollingTraitObject { params: PackagesPollerParams, - last_update_timestamp: Option, } #[async_trait] -impl Runnable for PackagesPollerWorkerTask { - fn new(task_name: String, params: PackagesPollerParams) -> Self { - Self { - task_name, - params, - last_update_timestamp: None, - } +impl PollingTrait for PollingTraitObject { + // This is called by the PollerWorker task. + async fn update(&mut self) { + self.update_globals_workdir_packages().await; } +} - async fn run(mut self, subsys: SubsystemHandle) -> Result<()> { - match self.event_loop(&subsys).cancel_on_shutdown(&subsys).await { - Ok(()) => { - log::info!("{} normal thread exit (2)", self.task_name); - Ok(()) - } - Err(_cancelled_by_shutdown) => { - log::info!("{} normal thread exit (1)", self.task_name); - Ok(()) - } - } +// This allow the PollerWorker to instantiate the PollingTraitObject. +impl Instantiable for PollingTraitObject { + fn new(params: PackagesPollerParams) -> Self { + Self { params } } } -impl PackagesPollerWorkerTask { - async fn process_audit_msg(&mut self, msg: GenericChannelMsg) { - // This function takes care of periodic operation synchronizing - // between the filesystem published package data and the globals. - - if msg.event_id != basic_types::EVENT_AUDIT { - log::error!("Unexpected event_id {:?}", msg); - return; - } - - // Verify that the workdir_idx is as expected. - if let Some(workdir_idx) = msg.workdir_idx { - if workdir_idx != self.params.workdir_idx { - log::error!( - "Unexpected workdir_idx {:?} (expected {:?})", - workdir_idx, - self.params.workdir_idx - ); - return; - } - } else { - log::error!("Missing workdir_idx {:?}", msg); - return; - } - - // Simply convert the periodic audit into an update, but do - // not force it. - let force = false; - self.update_globals_workdir_packages(force).await; +impl PackagesPoller { + pub fn new(params: PackagesPollerParams, subsys: &SubsystemHandle) -> Self { + let poller = + PollerWorker::::new(params.clone(), subsys); + Self { poller } } - async fn process_update_msg(&mut self, msg: GenericChannelMsg) { - // This function takes care of synchronizing between the filesystem - // published package data and the globals. - - // Make sure the event_id is EVENT_UPDATE. - if msg.event_id != basic_types::EVENT_UPDATE { - log::error!("Unexpected event_id {:?}", msg); - return; - } - - // Verify that the workdir_idx is as expected. - if let Some(workdir_idx) = msg.workdir_idx { - if workdir_idx != self.params.workdir_idx { - log::error!( - "Unexpected workdir_idx {:?} (expected {:?})", - workdir_idx, - self.params.workdir_idx - ); - return; - } - } else { - log::error!("Unexpected workdir_idx {:?}", msg); - return; - } - - let force = true; - self.update_globals_workdir_packages(force).await; + pub fn get_tx_channel(&self) -> GenericTx { + self.poller.get_tx_channel() } +} - async fn event_loop(&mut self, subsys: &SubsystemHandle) { - // Take mutable ownership of the event_rx channel as long this thread is running. - let event_rx = Arc::clone(&self.params.event_rx); - let mut event_rx = event_rx.lock().await; - - // Remove duplicate of EVENT_AUDIT and EVENT_UPDATE in the event_rx queue. - // (handle the case where the task was auto-restarted). - remove_generic_event_dups(&mut event_rx, &self.params.event_tx); - mpsc_q_check!(event_rx); // Just to help verify if the Q unexpectedly "accumulate". - - while !subsys.is_shutdown_requested() { - // Wait for a message. - if let Some(msg) = event_rx.recv().await { - common::mpsc_q_check!(event_rx); - match msg.event_id { - basic_types::EVENT_AUDIT => { - // Periodic processing. - self.process_audit_msg(msg).await; - } - basic_types::EVENT_UPDATE => { - // On-demand/reactive processing. - self.process_update_msg(msg).await; - } - _ => { - log::error!("Unexpected event_id {:?}", msg); - } - } - } else { - // Channel closed or shutdown requested. - return; - } - } - } +struct PackagesPollerWorkerTask { + task_name: String, + params: PackagesPollerParams, +} +impl PollingTraitObject { async fn create_package_instance( &self, package_path: PackagePath, @@ -298,19 +189,9 @@ impl PackagesPollerWorkerTask { Ok(ret_value) } - async fn update_globals_workdir_packages(&mut self, force: bool) { - // Debounce excessive refresh request on short period of time. - if !force { - if let Some(last_update_timestamp) = self.last_update_timestamp { - if last_update_timestamp.elapsed() < tokio::time::Duration::from_millis(50) { - return; - } - }; - } - self.last_update_timestamp = Some(tokio::time::Instant::now()); - - let workdir = &self.params.workdir_name; + async fn update_globals_workdir_packages(&mut self) { let workdir_idx = self.params.workdir_idx; + let workdir = WORKDIRS_KEYS[workdir_idx as usize].to_string(); // Multiple steps for efficiency: // Step 1) Read the Filesystem to get all the published PackagePath.