Skip to content

Commit

Permalink
Refactor background tasks with re-useable PollerWorker and PollingTrait.
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Jun 24, 2024
1 parent 4849385 commit 8780312
Show file tree
Hide file tree
Showing 19 changed files with 471 additions and 520 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Some common types depending only on built-in or "standard" types.
// Some common types depending only on built-in or tokio types.
pub type EpochTimestamp = tokio::time::Instant;

// Event Level (matches consts used in the Suibase log Move module)
Expand Down Expand Up @@ -111,3 +111,55 @@ impl std::fmt::Debug for GenericChannelMsg {
.finish()
}
}

pub struct AdminControllerMsg {
// Message sent toward the AdminController from various sources.
pub event_id: AdminControllerEventID,
pub workdir_idx: Option<WorkdirIdx>,
pub data_string: Option<String>,
// Channel to send a one-time response.
pub resp_channel: Option<tokio::sync::oneshot::Sender<String>>,
}

impl AdminControllerMsg {
pub fn new() -> Self {
Self {
event_id: 0,
workdir_idx: None,
data_string: None,
resp_channel: None,
}
}
pub fn data_string(&self) -> Option<String> {
self.data_string.clone()
}
}

impl std::fmt::Debug for AdminControllerMsg {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdminControllerMsg")
.field("event_id", &self.event_id)
.field("data_string", &self.data_string)
.finish()
}
}

// Events ID
pub type AdminControllerEventID = u8;
pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 128;
pub const EVENT_DEBUG_PRINT: u8 = 129;
pub const EVENT_SHELL_EXEC: u8 = 130;
pub const EVENT_POST_PUBLISH: u8 = 131;

pub type AdminControllerTx = tokio::sync::mpsc::Sender<AdminControllerMsg>;
pub type AdminControllerRx = tokio::sync::mpsc::Receiver<AdminControllerMsg>;

// For struct that can be instantiated with a single parameter.
pub trait Instantiable<P> {
fn new(params: P) -> Self;
}

// For struct that are guaranteed to have a WorkdirIdx field.
pub trait WorkdirContext {
fn workdir_idx(&self) -> WorkdirIdx;
}
2 changes: 2 additions & 0 deletions rust/suibase/crates/common/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// flatten everything under "common::workders" module.
pub use self::shell_worker::*;
pub use self::subscription_tracking::*;
pub use self::poller::*;

mod shell_worker;
mod subscription_tracking;
mod poller;
274 changes: 274 additions & 0 deletions rust/suibase/crates/common/src/workers/poller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// A PollerWorker does:
// - Call into a PollingTrait to do a polling action (e.g. do a CLI command, update globals etc...)
// - Handles AUDIT and UPDATE events. An audit is a *periodic* tentative polling, while
// an UPDATE is an *on-demand* forced polling.
//
// The polling action is done from a tokio task and is auto-restarted on panic!
//
use std::sync::Arc;

use anyhow::Result;

use axum::async_trait;

use tokio::sync::Mutex;
use tokio_graceful_shutdown::{FutureExt, NestedSubsystem, SubsystemBuilder, SubsystemHandle};

use crate::{
basic_types::{
self, remove_generic_event_dups, AutoThread, GenericChannelMsg, GenericRx, GenericTx,
Instantiable, Runnable, WorkdirContext, WorkdirIdx, MPSC_Q_SIZE,
},
mpsc_q_check,
shared_types::WORKDIRS_KEYS,
};

#[async_trait]
pub trait PollingTrait: Send {
async fn update(&mut self);
}

#[allow(dead_code)]
// T: A "trait object" implementing PollingTrait
// P: The parameter needed to instantiate T.
pub struct PollerWorker<T, P> {
params: P,
poller_params: InnerPollerWorkerParams,
poller_worker_handle: Option<NestedSubsystem<Box<dyn std::error::Error + Send + Sync>>>, // Set when the PollerWorker is started.
polling_trait_obj: Arc<Mutex<T>>,
}

impl<T, P> WorkdirContext for PollerWorker<T, P>
where
T: Instantiable<P> + PollingTrait + 'static,
P: WorkdirContext + Clone,
{
fn workdir_idx(&self) -> WorkdirIdx {
self.params.workdir_idx()
}
}

impl<T, P> PollerWorker<T, P>
where
T: Instantiable<P> + PollingTrait + 'static,
P: WorkdirContext + Clone,
{
pub fn new(params: P, subsys: &SubsystemHandle) -> Self {
let (poller_tx, poller_rx) = tokio::sync::mpsc::channel(MPSC_Q_SIZE);

let polling_trait_obj = Arc::new(Mutex::new(T::new(params.clone())));

let poller_params = InnerPollerWorkerParams::new(
polling_trait_obj.clone() as Arc<Mutex<dyn PollingTrait>>,
poller_rx,
poller_tx.clone(),
params.workdir_idx(),
);

let poller_worker = InnerPollerWorker::new(poller_params.clone());

let handle = subsys.start(SubsystemBuilder::new(
format!("poller-{}", params.workdir_idx()),
|a| poller_worker.run(a),
));

Self {
params: params.clone(),
poller_params,
poller_worker_handle: Some(handle),
polling_trait_obj,
}
}

pub fn get_tx_channel(&self) -> GenericTx {
self.poller_params.get_tx_channel()
}

pub fn get_polling_trait_obj(&self) -> Arc<Mutex<T>> {
self.polling_trait_obj.clone()
}
}

#[derive(Clone)]
pub struct InnerPollerWorkerParams {
polling_object: Arc<Mutex<dyn PollingTrait>>,
event_rx: Arc<Mutex<GenericRx>>, // To receive MSPC messages.
event_tx: GenericTx, // To send messages to self.
workdir_idx: WorkdirIdx,
workdir_name: String,
}

impl InnerPollerWorkerParams {
pub fn new(
polling_object: Arc<Mutex<dyn PollingTrait>>,
event_rx: GenericRx,
event_tx: GenericTx,
workdir_idx: WorkdirIdx,
) -> Self {
Self {
polling_object,
event_rx: Arc::new(Mutex::new(event_rx)),
event_tx,
workdir_idx,
workdir_name: WORKDIRS_KEYS[workdir_idx as usize].to_string(),
}
}

pub fn get_tx_channel(&self) -> GenericTx {
self.event_tx.clone()
}
}

pub struct InnerPollerWorker {
auto_thread: AutoThread<PollerWorkerTask, InnerPollerWorkerParams>,
}

impl InnerPollerWorker {
pub fn new(params: InnerPollerWorkerParams) -> Self {
Self {
auto_thread: AutoThread::new(
format!("InnerPollerWorker-{}", params.workdir_name),
params,
),
}
}

pub async fn run(self, subsys: SubsystemHandle) -> Result<()> {
self.auto_thread.run(subsys).await
}
}

struct PollerWorkerTask {
task_name: String,
params: InnerPollerWorkerParams,
last_update_timestamp: Option<tokio::time::Instant>,
}

#[async_trait]
impl Runnable<InnerPollerWorkerParams> for PollerWorkerTask {
fn new(task_name: String, params: InnerPollerWorkerParams) -> Self {
Self {
task_name,
params,
last_update_timestamp: None,
}
}

async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
match self.event_loop(&subsys).cancel_on_shutdown(&subsys).await {
Ok(()) => {
log::info!("{} normal task exit (2)", self.task_name);
Ok(())
}
Err(_cancelled_by_shutdown) => {
log::info!("{} normal task exit (1)", self.task_name);
Ok(())
}
}
}
}

impl PollerWorkerTask {
async fn process_audit_msg(&mut self, msg: GenericChannelMsg) {
// This function is for periodic tentative update.
if msg.event_id != crate::basic_types::EVENT_AUDIT {
log::error!("Unexpected event_id {:?}", msg);
return;
}

// Verify that the workdir_idx is as expected.
if let Some(workdir_idx) = msg.workdir_idx {
if workdir_idx != self.params.workdir_idx {
log::error!(
"Unexpected workdir_idx {:?} (expected {:?})",
workdir_idx,
self.params.workdir_idx
);
return;
}
} else {
log::error!("Missing workdir_idx {:?}", msg);
return;
}

let force = false;
self.callback_update_in_trait_object(force).await;
}

async fn process_update_msg(&mut self, msg: GenericChannelMsg) {
// This function is for "on-demand" forced update.

// Make sure the event_id is EVENT_UPDATE.
if msg.event_id != crate::basic_types::EVENT_UPDATE {
log::error!("Unexpected event_id {:?}", msg);
return;
}

// Verify that the workdir_idx is as expected.
if let Some(workdir_idx) = msg.workdir_idx {
if workdir_idx != self.params.workdir_idx {
log::error!(
"Unexpected workdir_idx {:?} (expected {:?})",
workdir_idx,
self.params.workdir_idx
);
return;
}
} else {
log::error!("Unexpected workdir_idx {:?}", msg);
return;
}

let force = true;
self.callback_update_in_trait_object(force).await;
}

async fn callback_update_in_trait_object(&mut self, force: bool) {
if !force {
// Debounce excessive update request because the callback will typically
// be "expensive" and involve I/O.
if let Some(last_cli_call_timestamp) = self.last_update_timestamp {
if last_cli_call_timestamp.elapsed() < tokio::time::Duration::from_millis(50) {
return;
}
};
}
self.last_update_timestamp = Some(tokio::time::Instant::now());

self.params.polling_object.lock().await.update().await;
}

async fn event_loop(&mut self, subsys: &SubsystemHandle) {
// Take mutable ownership of the event_rx channel as long this task is running.
let event_rx = Arc::clone(&self.params.event_rx);
let mut event_rx = event_rx.lock().await;

// Remove duplicate of EVENT_AUDIT and EVENT_UPDATE in the MPSC queue.
// (handle the case where the task was auto-restarted).
remove_generic_event_dups(&mut event_rx, &self.params.event_tx);
mpsc_q_check!(event_rx); // Just to help verify if the Q unexpectedly "accumulate".

while !subsys.is_shutdown_requested() {
// Wait for a message.
if let Some(msg) = event_rx.recv().await {
mpsc_q_check!(event_rx);
match msg.event_id {
basic_types::EVENT_AUDIT => {
// Periodic processing.
self.process_audit_msg(msg).await;
}
basic_types::EVENT_UPDATE => {
// On-demand/reactive processing.
self.process_update_msg(msg).await;
}
_ => {
log::error!("Unexpected event_id {:?}", msg);
}
}
} else {
// Channel closed or shutdown requested.
return;
}
}
}
}
Loading

0 comments on commit 8780312

Please sign in to comment.