From 0015e748117f407a63502711e701eab5a0188c39 Mon Sep 17 00:00:00 2001 From: Aayush Agrawal Date: Thu, 17 Aug 2023 17:44:00 +0530 Subject: [PATCH] full scrape cache --- conf/default.toml | 3 ++ src/app.rs | 6 +-- src/config/mod.rs | 8 +++ src/models/torrent.rs | 5 +- src/models/tracker.rs | 57 ++++++++++++++++++---- src/servers/cache/full_scrape.rs | 83 ++++++++++++++++++++++++++++++++ src/servers/cache/mod.rs | 75 +++++++++++++++++++++++++++++ src/servers/http/handler.rs | 38 ++++++++++----- src/servers/mod.rs | 13 +++++ src/storage/memory.rs | 20 ++++---- src/storage/mod.rs | 8 ++- src/storage/redis.rs | 7 ++- src/worker/mod.rs | 32 +++++++----- src/worker/tasks/announce.rs | 8 +-- src/worker/tasks/full_scrape.rs | 32 ++++++++++++ src/worker/tasks/mod.rs | 5 +- src/worker/tasks/scrape.rs | 16 +++--- 17 files changed, 348 insertions(+), 68 deletions(-) create mode 100644 src/servers/cache/full_scrape.rs create mode 100644 src/servers/cache/mod.rs create mode 100644 src/worker/tasks/full_scrape.rs diff --git a/conf/default.toml b/conf/default.toml index 1419564..bda69e3 100644 --- a/conf/default.toml +++ b/conf/default.toml @@ -90,6 +90,9 @@ allow_udp_scrape = true # Set to true to allow a full scrape of the tracker (scraping all torrents). allow_full_scrape = false +# The duration of time in secs for which a full scrape is cached. +full_scrape_cache_ttl = 600 + # The maximum number of torrents to scrape in a single request. max_multi_scrape_count = 64 diff --git a/src/app.rs b/src/app.rs index 3ec6c4c..6e3423b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -17,11 +17,7 @@ pub fn start(config: TSConfig, stop_signal_rx: StopSignalRx) -> Vec u32 { self.tracker.max_multi_scrape_count } + + pub fn full_scrape_cache_ttl(&self) -> Duration { + self.tracker.full_scrape_cache_ttl + } } #[derive(Debug, Default, Clone)] diff --git a/src/models/torrent.rs b/src/models/torrent.rs index 52d1a9d..e60f0d2 100644 --- a/src/models/torrent.rs +++ b/src/models/torrent.rs @@ -4,7 +4,7 @@ use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use super::{ - common::{PeerId, PEER_ID_LENGTH}, + common::{InfoHash, PeerId, PEER_ID_LENGTH}, peer::{Peer, PeerType}, }; @@ -210,3 +210,6 @@ impl AsRef<[u8]> for PeerIdKey { &self.0 } } + +pub type TorrentSwarmDict = IndexMap; +pub type TorrentStatsDict = IndexMap; diff --git a/src/models/tracker.rs b/src/models/tracker.rs index 2a68483..2d27410 100644 --- a/src/models/tracker.rs +++ b/src/models/tracker.rs @@ -1,6 +1,6 @@ use super::{ common::{InfoHash, IntervalDuration, NumOfBytes, PeerId, PeerKey, Port}, - torrent::TorrentStats, + torrent::{TorrentStats, TorrentStatsDict}, }; use crate::{ constants, @@ -217,21 +217,23 @@ pub struct ScrapeFiles(pub Vec<(InfoHash, TorrentStats)>); impl ScrapeFiles { fn bencode(&self, serializer: &mut bencode::Serializer) { serializer.start_dict(); - for (info_hash, stats) in &self.0 { - serializer.encode_bytes(info_hash.as_ref()); - bencode_dict!( - serializer, - constants::TRACKER_RESPONSE_COMPLETE => bencode_int!(serializer, stats.seeders), - constants::TRACKER_RESPONSE_DOWNLOADED => bencode_int!(serializer, stats.completed), - constants::TRACKER_RESPONSE_INCOMPLETE => bencode_int!(serializer, stats.leechers) - ); + bencode_file(serializer, info_hash, stats); } - serializer.end_dict(); } } +fn bencode_file(serializer: &mut bencode::Serializer, info_hash: &InfoHash, stats: &TorrentStats) { + serializer.encode_bytes(info_hash.as_ref()); + bencode_dict!( + serializer, + constants::TRACKER_RESPONSE_COMPLETE => bencode_int!(serializer, stats.seeders), + constants::TRACKER_RESPONSE_DOWNLOADED => bencode_int!(serializer, stats.completed), + constants::TRACKER_RESPONSE_INCOMPLETE => bencode_int!(serializer, stats.leechers) + ); +} + /// Represents the error response sent by the tracker for `announce` or `scrape` request. #[derive(Debug, Serialize, Deserialize, Default)] pub struct TrackerError { @@ -416,3 +418,38 @@ macro_rules! try_into_bytes { try_into_bytes!(AnnounceResponse); try_into_bytes!(ScrapeResponse); try_into_bytes!(TrackerError); + +#[derive(Debug)] +pub struct FullScrapeResponse { + pub ser: Option, +} + +impl FullScrapeResponse { + pub fn new() -> Self { + Self { + ser: Some(bencode::Serializer::new()), + } + } + + pub fn output(&mut self) -> Option { + match self.ser.take() { + Some(serializer) => Some(serializer.finalize()), + None => None, + } + } + + pub fn bencode(&mut self, files: &TorrentStatsDict) { + if let Some(ref mut serializer) = self.ser { + serializer.start_dict(); + bencode_str!(serializer, constants::TRACKER_RESPONSE_FILES); + serializer.start_dict(); + + for (info_hash, stats) in files { + bencode_file(serializer, info_hash, stats); + } + + serializer.end_dict(); + serializer.end_dict(); + } + } +} diff --git a/src/servers/cache/full_scrape.rs b/src/servers/cache/full_scrape.rs new file mode 100644 index 0000000..661564c --- /dev/null +++ b/src/servers/cache/full_scrape.rs @@ -0,0 +1,83 @@ +use bytes::Bytes; +use std::{sync::Arc, time::Duration}; + +use super::Cache; +use crate::{ + models::{torrent::TorrentStatsDict, tracker::FullScrapeResponse}, + storage::Processor, + worker::{FullScrapeProcessor, Task, TaskOutput, Worker}, +}; + +#[derive(Debug, Default)] +pub struct FullScrapeCache { + data: Option>, +} + +impl FullScrapeCache { + pub fn new(data: bytes::Bytes) -> FullScrapeCache { + FullScrapeCache { + data: Some(Arc::new(data)), + } + } +} + +pub async fn refresh(cache: Arc, worker: Arc, expires_in: Duration) { + let should_refresh = { + let mut cache = cache.full_scrape.write().await; + match cache.as_ref() { + Some(_) if cache.is_expired() && !cache.is_refreshing() => { + cache.set_refreshing(); + true + } + None => true, + _ => false, + } + }; + + if !should_refresh { + return; + } + + let task = Task::FullScrape(Box::new(FullScrapeResponse::new())); + let data = match worker.work(task).await { + Ok(TaskOutput::FullScrape(mut handler)) => handler.output().unwrap_or_default(), + _ => Bytes::new(), + }; + + let mut cache = cache.full_scrape.write().await; + + cache.set( + FullScrapeCache::new(data), + Some(std::time::Instant::now() + expires_in), + ); +} + +impl std::ops::Deref for FullScrapeCache { + type Target = Option>; + fn deref(&self) -> &Self::Target { + &self.data + } +} + +impl std::ops::DerefMut for FullScrapeCache { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +impl FullScrapeProcessor for FullScrapeResponse { + fn as_processor(&mut self) -> &mut dyn Processor { + self + } + + fn output(&mut self) -> Option { + self.output() + } +} + +impl Processor for FullScrapeResponse { + fn process(&mut self, input: &TorrentStatsDict) -> bool { + self.bencode(input); + return false; + } +} diff --git a/src/servers/cache/mod.rs b/src/servers/cache/mod.rs new file mode 100644 index 0000000..bba1385 --- /dev/null +++ b/src/servers/cache/mod.rs @@ -0,0 +1,75 @@ +pub mod full_scrape; + +use tokio::sync::RwLock; + +use self::full_scrape::FullScrapeCache; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub struct Cache { + pub full_scrape: RwLock>, +} + +impl Cache { + pub fn new() -> Cache { + Cache { + full_scrape: RwLock::new(CacheEntry::default()), + } + } +} + +#[derive(Default)] +pub struct CacheEntry { + data: T, + expires: Option, + refreshing: AtomicBool, +} + +impl CacheEntry +where + T: Send + Sync, +{ + pub fn new(data: T, expires: Option) -> CacheEntry { + CacheEntry { + data, + expires, + refreshing: AtomicBool::new(false), + } + } + + pub fn is_expired(&self) -> bool { + match self.expires { + Some(expires) => expires < std::time::Instant::now(), + None => true, + } + } + + pub fn is_refreshing(&self) -> bool { + self.refreshing.load(Ordering::SeqCst) + } + + pub fn set_refreshing(&mut self) { + self.refreshing.store(true, Ordering::SeqCst); + } + + pub fn set(&mut self, data: T, expires: Option) { + self.data = data; + self.expires = expires; + self.refreshing.store(false, Ordering::SeqCst); + } +} + +impl From<(T, std::time::Instant)> for CacheEntry +where + T: Send + Sync, +{ + fn from(value: (T, std::time::Instant)) -> CacheEntry { + CacheEntry::new(value.0, Some(value.1)) + } +} + +impl std::ops::Deref for CacheEntry { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.data + } +} diff --git a/src/servers/http/handler.rs b/src/servers/http/handler.rs index d84a0c7..ab63571 100644 --- a/src/servers/http/handler.rs +++ b/src/servers/http/handler.rs @@ -1,9 +1,10 @@ use super::error::HttpError; -use super::response::{Body, HttpResponse}; +use super::response::{Body, BodyStream, HttpResponse}; use crate::constants; use crate::models::tracker::{ AnnounceRequest, AnnounceResponse, ScrapeRequest, ScrapeResponse, TrackerError, }; +use crate::servers::cache::full_scrape; use crate::servers::http::request::HttpRequest; use crate::servers::State; use crate::utils::Loggable; @@ -126,24 +127,39 @@ async fn scrape(req: HttpRequest, state: State) -> Result return full_scrape(state).await, - false => { - let err: TrackerError = constants::TRACKER_ERROR_FULL_SCRAPE_NOT_ALLOWED.into(); - return HttpResponse::try_from(err); - } - } + return full_scrape(state).await; } - let task = Task::Scrape(Some(request)); + let task = Task::Scrape(request); let response: ScrapeResponse = state.worker.work(task).await?.into(); HttpResponse::try_from(response) } async fn full_scrape(state: State) -> Result { - let response: ScrapeResponse = state.worker.work(Task::FullScrape(None)).await?.into(); - HttpResponse::try_from(response) + if !state.config.allow_full_scrape() { + let err: TrackerError = constants::TRACKER_ERROR_FULL_SCRAPE_NOT_ALLOWED.into(); + return HttpResponse::try_from(err); + } + + let cache = state.cache.full_scrape.read().await; + + if cache.is_expired() && !cache.is_refreshing() { + let state = state.clone(); + let expires_in = state.config.full_scrape_cache_ttl(); + + tokio::spawn(async move { + full_scrape::refresh(state.cache, state.worker, expires_in).await; + }); + } + + match cache.as_ref() { + Some(val) => { + let stream = BodyStream::from(val.clone()); + return Ok(HttpResponse::from(stream)); + } + _ => HttpResponse::try_from(ScrapeResponse::default()), + } } impl TryFrom for HttpResponse { diff --git a/src/servers/mod.rs b/src/servers/mod.rs index f59b97a..f0a8192 100644 --- a/src/servers/mod.rs +++ b/src/servers/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod http; mod udp; @@ -5,6 +6,7 @@ use crate::config::TSConfig; use crate::worker::Worker; use std::sync::Arc; +use self::cache::Cache; pub use self::http::HttpServer; pub use self::udp::UdpServer; @@ -12,4 +14,15 @@ pub use self::udp::UdpServer; pub struct State { pub worker: Arc, pub config: Arc, + pub cache: Arc, +} + +impl State { + pub fn new(worker: Arc, config: Arc) -> State { + State { + worker, + config, + cache: Arc::new(Cache::new()), + } + } } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index dc9b9f8..27638e1 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -1,6 +1,4 @@ -use ahash::RandomState; use async_trait::async_trait; -use indexmap::IndexMap; use tokio::sync::RwLock; use super::{Processor, Result, Storage}; @@ -9,13 +7,12 @@ use crate::{ models::{ common::InfoHash, peer::{Peer, PeerType}, - torrent::{PeerDict, PeerIdKey, TorrentStats, TorrentSwarm}, + torrent::{ + PeerDict, PeerIdKey, TorrentStats, TorrentStatsDict, TorrentSwarm, TorrentSwarmDict, + }, }, }; -type TorrentSwarmDict = IndexMap; -type TorrentStatsDict = IndexMap; - #[derive(Debug)] pub struct MemoryStorage { swarms: RwLock, @@ -105,10 +102,13 @@ impl Storage for MemoryStorage { Ok(result) } - async fn get_all_torrent_stats(&self) -> Result> { - let iter = self.stats.read().await.clone().into_iter(); - let files: Vec<(InfoHash, TorrentStats)> = iter.collect(); - Ok(files) + async fn get_all_torrent_stats( + &self, + processor: &mut dyn Processor, + ) -> Result<()> { + let stats = self.stats.read().await; + let _ = processor.process(&stats); + Ok(()) } async fn put_peer_in_swarm( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 52b716a..a189f23 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,7 +1,7 @@ use crate::config::StorageType; use crate::models::common::InfoHash; use crate::models::peer::{Peer, PeerType}; -use crate::models::torrent::{PeerDict, PeerIdKey, TorrentStats}; +use crate::models::torrent::{PeerDict, PeerIdKey, TorrentStats, TorrentStatsDict}; use async_trait::async_trait; mod memory; @@ -23,12 +23,16 @@ pub trait Storage: Sync + Send { async fn has_torrent(&self, info_hash: &InfoHash) -> Result; async fn get_torrent_stats(&self, info_hash: &InfoHash) -> Result>; + async fn get_multi_torrent_stats( &self, info_hashes: Vec, ) -> Result>; - async fn get_all_torrent_stats(&self) -> Result>; + async fn get_all_torrent_stats( + &self, + processor: &mut dyn Processor, + ) -> Result<()>; async fn put_peer_in_swarm( &self, diff --git a/src/storage/redis.rs b/src/storage/redis.rs index d9b11a6..e00f5ce 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -7,7 +7,7 @@ use crate::{ models::{ common::InfoHash, peer::{Peer, PeerType}, - torrent::{PeerDict, PeerIdKey, TorrentStats}, + torrent::{PeerDict, PeerIdKey, TorrentStats, TorrentStatsDict}, }, }; @@ -56,7 +56,10 @@ impl Storage for RedisStorage { unimplemented!() } - async fn get_all_torrent_stats(&self) -> Result> { + async fn get_all_torrent_stats( + &self, + _processor: &mut dyn Processor, + ) -> Result<()> { unimplemented!() } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 0b4ef1e..f5f40cc 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,8 +1,9 @@ mod error; pub use error::{Error, Result}; mod tasks; +pub use tasks::full_scrape::FullScrapeProcessor; -use self::tasks::{announce, scrape, State, TaskExecutor}; +use self::tasks::{announce, full_scrape, scrape, State, TaskExecutor}; use crate::{config::TSConfig, storage::create_new_storage}; use log::{debug, info}; use std::sync::Arc; @@ -61,9 +62,9 @@ impl Drop for Worker { } pub enum Task { - Announce(announce::Task), - Scrape(scrape::Task), - FullScrape(scrape::Task), + Announce(announce::Input), + Scrape(scrape::Input), + FullScrape(full_scrape::Input), UpdateState(State), Shutdown, } @@ -71,6 +72,7 @@ pub enum Task { pub enum TaskOutput { Announce(announce::Output), Scrape(scrape::Output), + FullScrape(full_scrape::Output), None, } @@ -92,12 +94,18 @@ impl WorkerLoop { debug!("Worker loop received task {:?}", task); match task { - Task::Announce(task) => { - executor.execute(announce::TaskExecutor, task, sender, self.state.clone()) + Task::Announce(input) => { + executor.execute(announce::TaskExecutor, input, sender, self.state.clone()) } - Task::Scrape(task) | Task::FullScrape(task) => { - executor.execute(scrape::TaskExecutor, task, sender, self.state.clone()) + + Task::Scrape(input) => { + executor.execute(scrape::TaskExecutor, input, sender, self.state.clone()) + } + + Task::FullScrape(input) => { + executor.execute(full_scrape::TaskExecutor, input, sender, self.state.clone()) } + Task::UpdateState(state) => { self.state = state; let _ = sender.send(Ok(TaskOutput::None)); @@ -116,13 +124,13 @@ impl WorkerLoop { struct Executor; impl Executor { - fn execute(&self, executor: E, task: T, sender: TaskSender, state: State) + fn execute(&self, executor: E, input: I, sender: TaskSender, state: State) where - E: TaskExecutor + 'static, - T: Send + 'static, + E: TaskExecutor + 'static, + I: Send + 'static, { tokio::spawn(async move { - let response = executor.execute(task, state).await; + let response = executor.execute(input, state).await; let _ = sender.send(response); }); } diff --git a/src/worker/tasks/announce.rs b/src/worker/tasks/announce.rs index 9a5a20b..56888cb 100644 --- a/src/worker/tasks/announce.rs +++ b/src/worker/tasks/announce.rs @@ -19,18 +19,18 @@ use std::{net::IpAddr, ops::Range}; pub struct TaskExecutor; -pub type Task = (AnnounceRequest, IpAddr); +pub type Input = (AnnounceRequest, IpAddr); pub type Output = AnnounceResponse; const NUM_ZERO: NumOfBytes = NumOfBytes(0); #[async_trait] impl super::TaskExecutor for TaskExecutor { - type Task = Task; + type Input = Input; type Output = Output; - async fn execute(&self, task: Self::Task, state: State) -> Result { - let (req, sender_addr) = task; + async fn execute(&self, input: Self::Input, state: State) -> Result { + let (req, sender_addr) = input; let storage = state.storage; let config = state.config; diff --git a/src/worker/tasks/full_scrape.rs b/src/worker/tasks/full_scrape.rs new file mode 100644 index 0000000..a73e19a --- /dev/null +++ b/src/worker/tasks/full_scrape.rs @@ -0,0 +1,32 @@ +use async_trait::async_trait; +use bytes::Bytes; + +use super::State; +use crate::{ + models::torrent::TorrentStatsDict, + storage::Processor, + worker::{Result, TaskOutput}, +}; + +pub type Input = Box; +pub type Output = Box; + +pub struct TaskExecutor; + +#[async_trait] +impl super::TaskExecutor for TaskExecutor { + type Input = Input; + type Output = Output; + + async fn execute(&self, mut input: Self::Input, state: State) -> Result { + let processor: &mut dyn Processor = input.as_processor(); + let _ = state.storage.get_all_torrent_stats(processor).await?; + + Ok(TaskOutput::FullScrape(input)) + } +} + +pub trait FullScrapeProcessor: Processor + Send + Sync { + fn as_processor(&mut self) -> &mut dyn Processor; + fn output(&mut self) -> Option; +} diff --git a/src/worker/tasks/mod.rs b/src/worker/tasks/mod.rs index e76b983..5d1374a 100644 --- a/src/worker/tasks/mod.rs +++ b/src/worker/tasks/mod.rs @@ -1,4 +1,5 @@ pub(super) mod announce; +pub(super) mod full_scrape; pub(super) mod scrape; use super::{Result, TaskOutput}; @@ -12,10 +13,10 @@ pub(super) fn err(msg: &str) -> Result { #[async_trait] pub(super) trait TaskExecutor: Send + Sync { - type Task; + type Input; type Output; - async fn execute(&self, task: Self::Task, state: State) -> Result; + async fn execute(&self, task: Self::Input, state: State) -> Result; } #[derive(Clone)] diff --git a/src/worker/tasks/scrape.rs b/src/worker/tasks/scrape.rs index 371432a..86f7146 100644 --- a/src/worker/tasks/scrape.rs +++ b/src/worker/tasks/scrape.rs @@ -6,23 +6,21 @@ use crate::{ worker::{Result, TaskOutput}, }; -pub type Task = Option; +pub type Input = ScrapeRequest; pub type Output = ScrapeResponse; pub struct TaskExecutor; #[async_trait] impl super::TaskExecutor for TaskExecutor { - type Task = Task; + type Input = Input; type Output = Output; - async fn execute(&self, task: Self::Task, state: State) -> Result { - let storage = state.storage; - - let files = match task { - Some(req) => storage.get_multi_torrent_stats(req.info_hashes).await, - None => storage.get_all_torrent_stats().await, - }?; + async fn execute(&self, input: Self::Input, state: State) -> Result { + let files = state + .storage + .get_multi_torrent_stats(input.info_hashes) + .await?; let output = ScrapeResponse::new(ScrapeFiles(files));