Skip to content

Commit

Permalink
full scrape cache
Browse files Browse the repository at this point in the history
  • Loading branch information
a2agrawal committed Aug 17, 2023
1 parent e412e92 commit 0015e74
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 68 deletions.
3 changes: 3 additions & 0 deletions conf/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ allow_udp_scrape = true
# Set to true to allow a full scrape of the tracker (scraping all torrents).
allow_full_scrape = false

# The duration of time in secs for which a full scrape is cached.
full_scrape_cache_ttl = 600

# The maximum number of torrents to scrape in a single request.
max_multi_scrape_count = 64

Expand Down
6 changes: 1 addition & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ pub fn start(config: TSConfig, stop_signal_rx: StopSignalRx) -> Vec<JoinHandle<(

jobs.push(worker_job);

let state = State {
config,
worker: Arc::new(worker),
};

let state = State::new(Arc::new(worker), config);
let http_server_job = start_http_server(state, stop_signal_rx.clone());

jobs.push(http_server_job);
Expand Down
8 changes: 8 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub struct TrackerConfig {
/// Determines whether a full scrape is allowed.
pub allow_full_scrape: bool,

/// The duration of time for which a full scrape is cached.
#[serde(deserialize_with = "deserialize_secs_to_duration")]
pub full_scrape_cache_ttl: Duration,

/// The maximum number of torrents to scrape in a single request.
pub max_multi_scrape_count: u32,

Expand Down Expand Up @@ -297,6 +301,10 @@ impl TSConfig {
pub fn max_multi_scrape_count(&self) -> u32 {
self.tracker.max_multi_scrape_count
}

pub fn full_scrape_cache_ttl(&self) -> Duration {
self.tracker.full_scrape_cache_ttl
}
}

#[derive(Debug, Default, Clone)]
Expand Down
5 changes: 4 additions & 1 deletion src/models/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

use super::{
common::{PeerId, PEER_ID_LENGTH},
common::{InfoHash, PeerId, PEER_ID_LENGTH},
peer::{Peer, PeerType},
};

Expand Down Expand Up @@ -210,3 +210,6 @@ impl AsRef<[u8]> for PeerIdKey {
&self.0
}
}

pub type TorrentSwarmDict = IndexMap<InfoHash, TorrentSwarm, RandomState>;
pub type TorrentStatsDict = IndexMap<InfoHash, TorrentStats, RandomState>;
57 changes: 47 additions & 10 deletions src/models/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
common::{InfoHash, IntervalDuration, NumOfBytes, PeerId, PeerKey, Port},
torrent::TorrentStats,
torrent::{TorrentStats, TorrentStatsDict},
};
use crate::{
constants,
Expand Down Expand Up @@ -217,21 +217,23 @@ pub struct ScrapeFiles(pub Vec<(InfoHash, TorrentStats)>);
impl ScrapeFiles {
fn bencode(&self, serializer: &mut bencode::Serializer) {
serializer.start_dict();

for (info_hash, stats) in &self.0 {
serializer.encode_bytes(info_hash.as_ref());
bencode_dict!(
serializer,
constants::TRACKER_RESPONSE_COMPLETE => bencode_int!(serializer, stats.seeders),
constants::TRACKER_RESPONSE_DOWNLOADED => bencode_int!(serializer, stats.completed),
constants::TRACKER_RESPONSE_INCOMPLETE => bencode_int!(serializer, stats.leechers)
);
bencode_file(serializer, info_hash, stats);
}

serializer.end_dict();
}
}

fn bencode_file(serializer: &mut bencode::Serializer, info_hash: &InfoHash, stats: &TorrentStats) {
serializer.encode_bytes(info_hash.as_ref());
bencode_dict!(
serializer,
constants::TRACKER_RESPONSE_COMPLETE => bencode_int!(serializer, stats.seeders),
constants::TRACKER_RESPONSE_DOWNLOADED => bencode_int!(serializer, stats.completed),
constants::TRACKER_RESPONSE_INCOMPLETE => bencode_int!(serializer, stats.leechers)
);
}

/// Represents the error response sent by the tracker for `announce` or `scrape` request.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct TrackerError {
Expand Down Expand Up @@ -416,3 +418,38 @@ macro_rules! try_into_bytes {
try_into_bytes!(AnnounceResponse);
try_into_bytes!(ScrapeResponse);
try_into_bytes!(TrackerError);

#[derive(Debug)]
pub struct FullScrapeResponse {
pub ser: Option<bencode::Serializer>,
}

impl FullScrapeResponse {
pub fn new() -> Self {
Self {
ser: Some(bencode::Serializer::new()),
}
}

pub fn output(&mut self) -> Option<bytes::Bytes> {
match self.ser.take() {
Some(serializer) => Some(serializer.finalize()),
None => None,
}
}

pub fn bencode(&mut self, files: &TorrentStatsDict) {
if let Some(ref mut serializer) = self.ser {
serializer.start_dict();
bencode_str!(serializer, constants::TRACKER_RESPONSE_FILES);
serializer.start_dict();

for (info_hash, stats) in files {
bencode_file(serializer, info_hash, stats);
}

serializer.end_dict();
serializer.end_dict();
}
}
}
83 changes: 83 additions & 0 deletions src/servers/cache/full_scrape.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use bytes::Bytes;
use std::{sync::Arc, time::Duration};

use super::Cache;
use crate::{
models::{torrent::TorrentStatsDict, tracker::FullScrapeResponse},
storage::Processor,
worker::{FullScrapeProcessor, Task, TaskOutput, Worker},
};

#[derive(Debug, Default)]
pub struct FullScrapeCache {
data: Option<Arc<bytes::Bytes>>,
}

impl FullScrapeCache {
pub fn new(data: bytes::Bytes) -> FullScrapeCache {
FullScrapeCache {
data: Some(Arc::new(data)),
}
}
}

pub async fn refresh(cache: Arc<Cache>, worker: Arc<Worker>, expires_in: Duration) {
let should_refresh = {
let mut cache = cache.full_scrape.write().await;
match cache.as_ref() {
Some(_) if cache.is_expired() && !cache.is_refreshing() => {
cache.set_refreshing();
true
}
None => true,
_ => false,
}
};

if !should_refresh {
return;
}

let task = Task::FullScrape(Box::new(FullScrapeResponse::new()));
let data = match worker.work(task).await {
Ok(TaskOutput::FullScrape(mut handler)) => handler.output().unwrap_or_default(),
_ => Bytes::new(),
};

let mut cache = cache.full_scrape.write().await;

cache.set(
FullScrapeCache::new(data),
Some(std::time::Instant::now() + expires_in),
);
}

impl std::ops::Deref for FullScrapeCache {
type Target = Option<Arc<bytes::Bytes>>;
fn deref(&self) -> &Self::Target {
&self.data
}
}

impl std::ops::DerefMut for FullScrapeCache {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}

impl FullScrapeProcessor for FullScrapeResponse {
fn as_processor(&mut self) -> &mut dyn Processor<TorrentStatsDict> {
self
}

fn output(&mut self) -> Option<Bytes> {
self.output()
}
}

impl Processor<TorrentStatsDict> for FullScrapeResponse {
fn process(&mut self, input: &TorrentStatsDict) -> bool {
self.bencode(input);
return false;
}
}
75 changes: 75 additions & 0 deletions src/servers/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
pub mod full_scrape;

use tokio::sync::RwLock;

use self::full_scrape::FullScrapeCache;
use std::sync::atomic::{AtomicBool, Ordering};

pub struct Cache {
pub full_scrape: RwLock<CacheEntry<FullScrapeCache>>,
}

impl Cache {
pub fn new() -> Cache {
Cache {
full_scrape: RwLock::new(CacheEntry::default()),
}
}
}

#[derive(Default)]
pub struct CacheEntry<T> {
data: T,
expires: Option<std::time::Instant>,
refreshing: AtomicBool,
}

impl<T> CacheEntry<T>
where
T: Send + Sync,
{
pub fn new(data: T, expires: Option<std::time::Instant>) -> CacheEntry<T> {
CacheEntry {
data,
expires,
refreshing: AtomicBool::new(false),
}
}

pub fn is_expired(&self) -> bool {
match self.expires {
Some(expires) => expires < std::time::Instant::now(),
None => true,
}
}

pub fn is_refreshing(&self) -> bool {
self.refreshing.load(Ordering::SeqCst)
}

pub fn set_refreshing(&mut self) {
self.refreshing.store(true, Ordering::SeqCst);
}

pub fn set(&mut self, data: T, expires: Option<std::time::Instant>) {
self.data = data;
self.expires = expires;
self.refreshing.store(false, Ordering::SeqCst);
}
}

impl<T> From<(T, std::time::Instant)> for CacheEntry<T>
where
T: Send + Sync,
{
fn from(value: (T, std::time::Instant)) -> CacheEntry<T> {
CacheEntry::new(value.0, Some(value.1))
}
}

impl<T> std::ops::Deref for CacheEntry<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data
}
}
38 changes: 27 additions & 11 deletions src/servers/http/handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::error::HttpError;
use super::response::{Body, HttpResponse};
use super::response::{Body, BodyStream, HttpResponse};
use crate::constants;
use crate::models::tracker::{
AnnounceRequest, AnnounceResponse, ScrapeRequest, ScrapeResponse, TrackerError,
};
use crate::servers::cache::full_scrape;
use crate::servers::http::request::HttpRequest;
use crate::servers::State;
use crate::utils::Loggable;
Expand Down Expand Up @@ -126,24 +127,39 @@ async fn scrape(req: HttpRequest<IncomingBody>, state: State) -> Result<HttpResp
let request: ScrapeRequest = req.query_params()?;

if request.info_hashes.is_empty() {
match state.config.allow_full_scrape() {
true => return full_scrape(state).await,
false => {
let err: TrackerError = constants::TRACKER_ERROR_FULL_SCRAPE_NOT_ALLOWED.into();
return HttpResponse::try_from(err);
}
}
return full_scrape(state).await;
}

let task = Task::Scrape(Some(request));
let task = Task::Scrape(request);
let response: ScrapeResponse = state.worker.work(task).await?.into();

HttpResponse::try_from(response)
}

async fn full_scrape(state: State) -> Result<HttpResponse, HttpError> {
let response: ScrapeResponse = state.worker.work(Task::FullScrape(None)).await?.into();
HttpResponse::try_from(response)
if !state.config.allow_full_scrape() {
let err: TrackerError = constants::TRACKER_ERROR_FULL_SCRAPE_NOT_ALLOWED.into();
return HttpResponse::try_from(err);
}

let cache = state.cache.full_scrape.read().await;

if cache.is_expired() && !cache.is_refreshing() {
let state = state.clone();
let expires_in = state.config.full_scrape_cache_ttl();

tokio::spawn(async move {
full_scrape::refresh(state.cache, state.worker, expires_in).await;
});
}

match cache.as_ref() {
Some(val) => {
let stream = BodyStream::from(val.clone());
return Ok(HttpResponse::from(stream));
}
_ => HttpResponse::try_from(ScrapeResponse::default()),
}
}

impl TryFrom<TrackerError> for HttpResponse {
Expand Down
13 changes: 13 additions & 0 deletions src/servers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
mod cache;
mod http;
mod udp;

use crate::config::TSConfig;
use crate::worker::Worker;
use std::sync::Arc;

use self::cache::Cache;
pub use self::http::HttpServer;
pub use self::udp::UdpServer;

#[derive(Clone)]
pub struct State {
pub worker: Arc<Worker>,
pub config: Arc<TSConfig>,
pub cache: Arc<Cache>,
}

impl State {
pub fn new(worker: Arc<Worker>, config: Arc<TSConfig>) -> State {
State {
worker,
config,
cache: Arc::new(Cache::new()),
}
}
}
Loading

0 comments on commit 0015e74

Please sign in to comment.