Skip to content

Commit

Permalink
[wip] Cumulative micro opts (1-2% perf. gain)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Mar 8, 2024
1 parent 1d111dc commit bf1ff3e
Showing 1 changed file with 84 additions and 77 deletions.
161 changes: 84 additions & 77 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::sync::atomic::{self, AtomicPtr, AtomicU8, AtomicUsize, Ordering};
use std::time::Instant;

use crossbeam_utils::{Backoff, CachePadded};
Expand All @@ -25,14 +25,15 @@ use crate::waker::SyncWaker;
// * If a message has been written into the slot, `WRITE` is set.
// * If a message has been read from the slot, `READ` is set.
// * If the block is being destroyed, `DESTROY` is set.
const WRITE: usize = 1;
const READ: usize = 2;
const DESTROY: usize = 4;
const WRITE: u8 = 1;
const READ: u8 = 2;
const DESTROY: u8 = 4;

// Each block covers one "lap" of indices.
const LAP: usize = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
const NEAR_BLOCK_CAP: usize = LAP - 2;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
// Has two different purposes:
Expand All @@ -41,20 +42,12 @@ const SHIFT: usize = 1;
const MARK_BIT: usize = 1;

/// A slot in a block.
struct Slot<T> {
/// The message.
msg: UnsafeCell<MaybeUninit<T>>,

/// The state of the slot.
state: AtomicUsize,
struct Slot<'a, T> {
state: &'a AtomicU8,
msg: &'a UnsafeCell<T>,
}

impl<T> Slot<T> {
const UNINIT: Self = Self {
msg: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};

impl<T> Slot<'_, T> {
/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
Expand All @@ -71,16 +64,23 @@ struct Block<T> {
/// The next block in the linked list.
next: AtomicPtr<Block<T>>,

/// Slots for messages.
slots: [Slot<T>; BLOCK_CAP],
/// states for slots.
states: [AtomicU8; BLOCK_CAP],

/// messages for slots.
msgs: MaybeUninit<[UnsafeCell<T>; BLOCK_CAP]>,
}

impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Self {
#[allow(clippy::declare_interior_mutable_const)]
const UNINIT_STATE: AtomicU8 = AtomicU8::new(0);

Self {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
states: [UNINIT_STATE; BLOCK_CAP],
msgs: MaybeUninit::uninit(),
}
}

Expand All @@ -96,12 +96,19 @@ impl<T> Block<T> {
}
}

unsafe fn get_slot_unchecked(&self, i: usize) -> Slot<'_, T> {
Slot {
msg: unsafe { self.msgs.assume_init_ref().get_unchecked(i) },
state: unsafe { self.states.get_unchecked(i) },
}
}

/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
unsafe fn destroy(this: *mut Self, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
let slot = unsafe { (*this).slots.get_unchecked(i) };
let slot = unsafe { (*this).get_slot_unchecked(i) };

// Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0
Expand Down Expand Up @@ -196,39 +203,32 @@ impl<T> Channel<T> {
}

/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
fn start_send(&self, token: &mut Token) {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
let mut next_block = None;
let mut next_block = ptr::null_mut::<Block<T>>();

let mut tail;
let mut block;
loop {
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);

// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
token.list.block = ptr::null();
return true;
break;
}

// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
if offset > NEAR_BLOCK_CAP {
// If we reached the end of the block, wait until the next one is installed.
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}

// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
}

// If this is the first message to be sent into the channel, we need to allocate the
// first block and install it.
if block.is_null() {
} else if block.is_null() {
// If this is the first message to be sent into the channel, we need to allocate
// the first block and install it.
let new = Box::into_raw(Box::new(Block::<T>::new()));

if self
Expand All @@ -240,16 +240,17 @@ impl<T> Channel<T> {
self.head.block.store(new, Ordering::Release);
block = new;
} else {
next_block = unsafe { Some(Box::from_raw(new)) };
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
next_block = new;
continue;
}
} else if offset == NEAR_BLOCK_CAP && next_block.is_null() {
// If we're going to have to install the next block, allocate it in advance in
// order to make the wait for other threads as short as possible.
next_block = Box::into_raw(Box::new(Block::<T>::new()));
}

let new_tail = tail + (1 << SHIFT);

// Try advancing the tail forward.
let new_tail = tail + (1 << SHIFT);
match self.tail.index.compare_exchange_weak(
tail,
new_tail,
Expand All @@ -258,24 +259,26 @@ impl<T> Channel<T> {
) {
Ok(_) => unsafe {
// If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(next_block.unwrap());
self.tail.block.store(next_block, Ordering::Release);
if offset == NEAR_BLOCK_CAP {
let n = std::mem::replace(&mut next_block, ptr::null_mut());
self.tail.block.store(n, Ordering::Release);
self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
(*block).next.store(next_block, Ordering::Release);
(*block).next.store(n, Ordering::Release);
}

token.list.block = block as *const u8;
token.list.offset = offset;
return true;
break;
},
Err(t) => {
tail = t;
block = self.tail.block.load(Ordering::Acquire);
Err(_) => {
backoff.spin();
}
}
}

if !next_block.is_null() {
drop(unsafe { Box::from_raw(next_block) });
}
}

/// Writes a message into the channel.
Expand All @@ -288,8 +291,8 @@ impl<T> Channel<T> {
// Write the message into the slot.
let block = token.list.block.cast::<Block<T>>();
let offset = token.list.offset;
let slot = unsafe { (*block).slots.get_unchecked(offset) };
unsafe { slot.msg.get().write(MaybeUninit::new(msg)) }
let slot = unsafe { (*block).get_slot_unchecked(offset) };
unsafe { slot.msg.get().write(msg) }
slot.state.fetch_or(WRITE, Ordering::Release);

// Wake a sleeping receiver.
Expand All @@ -300,18 +303,19 @@ impl<T> Channel<T> {
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
let mut head;
let mut block;

loop {
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);

// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}

Expand Down Expand Up @@ -344,8 +348,6 @@ impl<T> Channel<T> {
// In that case, just wait until it gets initialized.
if block.is_null() {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}

Expand All @@ -358,7 +360,7 @@ impl<T> Channel<T> {
) {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
if offset == NEAR_BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
Expand All @@ -373,9 +375,7 @@ impl<T> Channel<T> {
token.list.offset = offset;
return true;
},
Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
Err(_) => {
backoff.spin();
}
}
Expand All @@ -392,14 +392,14 @@ impl<T> Channel<T> {
// Read the message.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = unsafe { (*block).slots.get_unchecked(offset) };
let slot = unsafe { (*block).get_slot_unchecked(offset) };
slot.wait_write();
let msg = unsafe { slot.msg.get().read().assume_init() };
let msg = unsafe { slot.msg.get().read() };

// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
unsafe {
if offset + 1 == BLOCK_CAP {
if offset == NEAR_BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
Expand All @@ -423,8 +423,10 @@ impl<T> Channel<T> {
msg: T,
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };

self.start_send(token);
unsafe {
self.write(token, msg)
.map_err(SendTimeoutError::Disconnected)
Expand All @@ -433,7 +435,8 @@ impl<T> Channel<T> {

/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };

if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
Expand All @@ -444,7 +447,9 @@ impl<T> Channel<T> {

/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let token = MaybeUninit::uninit();
let token = &mut unsafe { token.assume_init() };

loop {
// Try receiving a message several times.
let backoff = Backoff::new();
Expand Down Expand Up @@ -609,9 +614,9 @@ impl<T> Channel<T> {

if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
let slot = (*block).get_slot_unchecked(offset);
slot.wait_write();
(*slot.msg.get()).assume_init_drop();
drop(ptr::read(slot.msg.get()));
} else {
(*block).wait_next();
// Deallocate the block and move to the next one.
Expand Down Expand Up @@ -667,8 +672,9 @@ impl<T> Drop for Channel<T> {

if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
(*slot.msg.get()).assume_init_drop();
let msg =
ptr::read((*block).msgs.assume_init_ref().get_unchecked(offset).get());
drop(msg);
} else {
// Deallocate the block and move to the next one.
let next = *(*block).next.get_mut();
Expand Down Expand Up @@ -731,7 +737,8 @@ impl<T> SelectHandle for Receiver<'_, T> {

impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
self.0.start_send(token);
true
}

fn deadline(&self) -> Option<Instant> {
Expand Down

0 comments on commit bf1ff3e

Please sign in to comment.