Skip to content

Commit

Permalink
add Deferred type for deferred retirement batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed May 11, 2024
1 parent 98c34e3 commit 99d597b
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 121 deletions.
4 changes: 2 additions & 2 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ impl Collector {
/// Retiring the same pointer twice can cause **undefined behavior**, even if the
/// reclaimer doesn't free memory.
///
/// Additionally, the reclaimer passed to `retire` must correctly free values of
/// type `T`.
/// Additionally, the pointer must be valid to access as a [`Link`], per the [`AsLink`]
/// trait, and the reclaimer passed to `retire` must correctly free values of type `T`.
///
/// # Examples
///
Expand Down
132 changes: 132 additions & 0 deletions src/deferred.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};

use crate::raw::Node;
use crate::tls::Thread;
use crate::{AsLink, Collector, Link};

/// A batch of pointers to be reclaimed in the future.
///
/// Sometimes it is necessary to defer the retirement of a batch of pointers.
/// For example, a set of pointers may be reachable from multiple locations in a data structure
/// and can only be retired after a specific object is reclaimed. In such cases, the [`Deferred`] type
/// can serve as a cheap place to defer the retirement of pointers, without allocating extra memory.
///
/// [`Deferred`] is a concurrent list, meaning that pointers can be added from multiple threads
/// concurrently. It is not meant to be used to amortize the cost of retirement, which is done
/// through thread-local batches controlled with [`Collector::batch_size`], as access from a single-thread
/// can be more expensive than is required. Deferred batches are useful when you need to control when
/// a batch of nodes is retired directly, a relatively rare use case.
///
/// # Examples
///
/// ```rust
/// # use seize::{Deferred, Collector};
/// let collector = Collector::new().batch_size(10);
///
/// // allocate a set of pointers
/// let items = (0..10)
/// .map(|i| AtomicPtr::new(collector.link_boxed(i)))
/// .collect::<Arc<[_]>>();

Check failure on line 31 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Docs

failed to resolve: use of undeclared type `AtomicPtr`
///

Check failure on line 32 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Docs

cannot find type `Arc` in this scope
/// // create a batch of nodes to retire
/// let mut batch = Deferred::new();
///
/// for item in items.iter() {
/// let new = collector.link_boxed(0);
/// // make the item unreachable with an atomic swap
/// let old = item.swap(new, Ordering::AcqRel);
/// // don't retire just yet, add the node to the batch

Check failure on line 40 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Docs

failed to resolve: use of undeclared type `Ordering`
/// unsafe { batch.defer(old) };
/// }
///
/// // sometime later... retire all the items in the batch
/// unsafe { batch.retire_all(&collector, reclaim::boxed::<Linked<usize>>) }
/// ```

Check failure on line 46 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Docs

failed to resolve: use of undeclared crate or module `reclaim`

Check failure on line 46 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Docs

cannot find type `Linked` in this scope
pub struct Deferred {
head: AtomicPtr<Node>,
pub(crate) min_epoch: AtomicU64,
}

impl Deferred {
/// Create a new batch of deferred nodes.
pub const fn new() -> Deferred {

Check failure on line 54 in src/deferred.rs

View workflow job for this annotation

GitHub Actions / Clippy

you should consider adding a `Default` implementation for `Deferred`
Deferred {
head: AtomicPtr::new(ptr::null_mut()),
min_epoch: AtomicU64::new(0),
}
}

/// Add an object to the batch.
///
/// # Safety
///
/// After this method is called, it is *undefined behavior* to add this pointer to the
/// batch again, or any other batch. The pointer must also be valid for access as a [`Link`],
/// per the [`AsLink`] trait.
pub unsafe fn defer<T: AsLink>(&self, ptr: *mut T) {
// `ptr` is guaranteed to be a valid pointer that can be cast to a node (`T: AsLink`)
//
// any other thread with a reference to the pointer only has a shared
// reference to the UnsafeCell<Node>, which is allowed to alias. the caller
// guarantees that the same pointer is not deferred twice, so we can safely read/write
// to the node through this pointer.
let node = UnsafeCell::raw_get(ptr.cast::<UnsafeCell<Node>>());

let birth_epoch = unsafe { (*node).birth_epoch };

// keep track of the oldest node in the batch
self.min_epoch.fetch_min(birth_epoch, Ordering::Relaxed);

// relaxed: we never access head
let mut prev = self.head.load(Ordering::Relaxed);

loop {
unsafe { (*node).next_batch = prev }

// relaxed: head is only ever accessed through a mutable reference
match self
.head
.compare_exchange_weak(prev, node, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => return,
Err(found) => prev = found,
}
}
}

/// Retires a batch of values, running `reclaim` when no threads hold a reference to any
/// nodes in the batch.
///
/// Note that this method is disconnected from any guards on the current thread,
/// so the pointers may be reclaimed immediately.
///
/// # Safety
///
/// The safety requirements of [`Collector::retire`] apply to each node in the batch.
///
/// [`Collector::retire`]: crate::Collector::retire
pub unsafe fn retire_all(&mut self, collector: &Collector, reclaim: unsafe fn(*mut Link)) {
// note that `add_batch` doesn't ever actually reclaim the pointer immediately if
// the current thread is active, similar to `retire`.
unsafe { collector.raw.add_batch(self, reclaim, Thread::current()) }
}

/// Run a function for each node in the batch.
///
/// This function does not consume the batch and can be called multiple
/// times, **before retirement**.
pub fn for_each(&mut self, mut f: impl FnMut(*mut Node)) {
let mut list = *self.head.get_mut();

while !list.is_null() {
let curr = list;

// safety: `curr` is a valid non-null node in the list
list = unsafe { (*curr).next_batch };

f(curr);
}
}
}
10 changes: 6 additions & 4 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ pub trait Guard {
/// # Safety
///
/// The retired object must no longer be accessible to any thread that enters
/// after it is removed. Additionally, the reclaimer passed to `retire` must
/// correctly free values of type `T`.
/// after it is removed.
///
/// Retiring the same pointer twice can cause **undefined behavior**, even if the
/// reclaimer doesn't free memory.
///
/// Additionally, the pointer must be valid to access as a [`Link`], per the [`AsLink`] trait,
/// and the reclaimer passed to `retire` must correctly free values of type `T`.
unsafe fn defer_retire<T: AsLink>(&self, ptr: *mut T, reclaim: unsafe fn(*mut Link));

/// Returns a numeric identifier for the current thread.
Expand Down Expand Up @@ -203,7 +205,7 @@ impl fmt::Debug for LocalGuard<'_> {
/// Most of the functionality provided by this type is through the [`Guard`] trait.
pub struct OwnedGuard<'a> {
collector: &'a Collector,
// the current thread
// an owned thread
thread: Thread,
}

Expand Down Expand Up @@ -281,7 +283,7 @@ impl Drop for OwnedGuard<'_> {
// safety: we have ownership of `thread`
let reservation = unsafe { self.collector.raw.reservation(self.thread) };

// safety: self.thread is the current thread
// safety: self.thread is an owned thread
unsafe { self.collector.raw.leave(reservation) };

// we are now inactive and can free the thread slot
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![doc = include_str!("../README.md")]

mod collector;
mod deferred;
mod guard;
mod raw;
mod tls;
Expand All @@ -11,4 +12,5 @@ pub mod guide;
pub mod reclaim;

pub use collector::{AsLink, Collector, Link, Linked};
pub use deferred::Deferred;
pub use guard::{unprotected, Guard, LocalGuard, OwnedGuard, UnprotectedGuard};
64 changes: 52 additions & 12 deletions src/raw.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::tls::{Thread, ThreadLocal};
use crate::utils::CachePadded;
use crate::{AsLink, Link};
use crate::{AsLink, Deferred, Link};

use std::cell::{Cell, UnsafeCell};
use std::mem::ManuallyDrop;
use std::num::NonZeroU64;
use std::ptr::{self, NonNull};
use std::sync::atomic::{self, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
Expand Down Expand Up @@ -270,14 +269,11 @@ impl Collector {
// to the node through this pointer.
let node = UnsafeCell::raw_get(ptr.cast::<UnsafeCell<Node>>());

// if a thread is active in the minimum birth era, it has access to at least one
// of the nodes in the batch and must be tracked.
// keep track of the oldest node in the batch
//
// if epoch tracking is disabled this will always be false (0 > 0).
let birth_epoch = unsafe { (*node).birth_epoch };
if batch.min_epoch > birth_epoch {
batch.min_epoch = birth_epoch;
}
batch.min_epoch = batch.min_epoch.min(birth_epoch);

// create an entry for this node
batch.entries.push(Entry {
Expand All @@ -292,6 +288,48 @@ impl Collector {
}
}

// Retire a batch of nodes.
//
// # Safety
//
// The batch must no longer accessible to any inactive threads.
// This method is not safe to call concurrently with the same `thread`.
pub unsafe fn add_batch(
&self,
deferred: &mut Deferred,
reclaim: unsafe fn(*mut Link),
thread: Thread,
) {
// safety: local batches are only accessed by the current thread until retirement
let local_batch = unsafe {
&mut *self
.batches
.load_or(|| LocalBatch::new(self.batch_size), thread)
.get()
};

// safety: local batch pointers are always valid until reclamation
let batch = unsafe { local_batch.0.as_mut() };

// keep track of the oldest node in the batch
let min_epoch = *deferred.min_epoch.get_mut();
batch.min_epoch = batch.min_epoch.min(min_epoch);

deferred.for_each(|node| {
// create an entry for this node
batch.entries.push(Entry {
node,
reclaim,
batch: local_batch.0.as_ptr(),
});
});

// attempt to retire the batch if we have enough entries
if batch.entries.len() % self.batch_size == 0 {
unsafe { self.try_retire(local_batch, thread) }
}
}

// Attempt to retire nodes in the current thread's batch.
//
// # Safety
Expand Down Expand Up @@ -419,7 +457,7 @@ impl Collector {
}

// link this node to the reservation list
unsafe { *(*curr.node).next = prev }
unsafe { (*curr.node).next = prev }

// release: release the node and entries in this batch
match head.compare_exchange_weak(
Expand Down Expand Up @@ -465,8 +503,8 @@ impl Collector {
while !list.is_null() {
let curr = list;

// safety: `curr` is a valid node in the list
list = unsafe { *(*(*curr).node).next };
// safety: `curr` is a valid non-null node in the list
list = unsafe { (*(*curr).node).next };
let batch = unsafe { (*curr).batch };

// safety: batch pointers are valid for reads until they are freed
Expand Down Expand Up @@ -518,11 +556,13 @@ impl Drop for Collector {
#[repr(C)]
pub union Node {
// Before retiring: the epoch this node was created in
birth_epoch: u64,
pub birth_epoch: u64,
// While retiring: temporary location for an active reservation list.
head: *const AtomicPtr<Entry>,
// After retiring: next node in the thread's reservation list
next: ManuallyDrop<*mut Entry>,
next: *mut Entry,
// In deferred batch: next node in the batch
pub next_batch: *mut Node,
}

// A per-thread reservation list.
Expand Down

0 comments on commit 99d597b

Please sign in to comment.