Skip to content

Commit

Permalink
Add select_biased! macro
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed May 4, 2024
1 parent 9c59813 commit 364bda6
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
39 changes: 24 additions & 15 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ enum Timeout {
fn run_select(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<(Token, usize, *const u8)> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -193,8 +194,10 @@ fn run_select(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

// Create a token, which serves as a temporary variable that gets initialized in this function
// and is later used by a call to `channel::read()` or `channel::write()` that completes the
Expand Down Expand Up @@ -325,6 +328,7 @@ fn run_select(
fn run_ready(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<usize> {
if handles.is_empty() {
// Wait until the timeout and return.
Expand All @@ -341,8 +345,10 @@ fn run_ready(
}
}

// Shuffle the operations for fairness.
utils::shuffle(handles);
if !is_biased {
// Shuffle the operations for fairness.
utils::shuffle(handles);
}

loop {
let backoff = Backoff::new();
Expand Down Expand Up @@ -451,7 +457,7 @@ fn run_ready(
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
) -> Result<SelectedOperation<'a>, TrySelectError> {
match run_select(handles, Timeout::Now) {
match run_select(handles, Timeout::Now, false) {
None => Err(TrySelectError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand All @@ -467,12 +473,13 @@ pub fn try_select<'a>(
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
is_biased: bool,
) -> SelectedOperation<'a> {
if handles.is_empty() {
panic!("no operations have been added to `Select`");
}

let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
SelectedOperation {
token,
index,
Expand All @@ -487,10 +494,11 @@ pub fn select<'a>(
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => select_deadline(handles, deadline),
None => Ok(select(handles)),
Some(deadline) => select_deadline(handles, deadline, is_biased),
None => Ok(select(handles, is_biased)),
}
}

Expand All @@ -499,8 +507,9 @@ pub fn select_timeout<'a>(
pub(crate) fn select_deadline<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
deadline: Instant,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match run_select(handles, Timeout::At(deadline)) {
match run_select(handles, Timeout::At(deadline), is_biased) {
None => Err(SelectTimeoutError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
Expand Down Expand Up @@ -815,7 +824,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn select(&mut self) -> SelectedOperation<'a> {
select(&mut self.handles)
select(&mut self.handles, false)
}

/// Blocks for a limited time until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -869,7 +878,7 @@ impl<'a> Select<'a> {
&mut self,
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_timeout(&mut self.handles, timeout)
select_timeout(&mut self.handles, timeout, false)
}

/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -925,7 +934,7 @@ impl<'a> Select<'a> {
&mut self,
deadline: Instant,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_deadline(&mut self.handles, deadline)
select_deadline(&mut self.handles, deadline, false)
}

/// Attempts to find a ready operation without blocking.
Expand Down Expand Up @@ -964,7 +973,7 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
match run_ready(&mut self.handles, Timeout::Now) {
match run_ready(&mut self.handles, Timeout::Now, false) {
None => Err(TryReadyError),
Some(index) => Ok(index),
}
Expand Down Expand Up @@ -1021,7 +1030,7 @@ impl<'a> Select<'a> {
panic!("no operations have been added to `Select`");
}

run_ready(&mut self.handles, Timeout::Never).unwrap()
run_ready(&mut self.handles, Timeout::Never, false).unwrap()
}

/// Blocks for a limited time until one of the operations becomes ready.
Expand Down Expand Up @@ -1122,7 +1131,7 @@ impl<'a> Select<'a> {
/// # t2.join().unwrap(); // join thread to avoid https://github.com/rust-lang/miri/issues/1371
/// ```
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
match run_ready(&mut self.handles, Timeout::At(deadline)) {
match run_ready(&mut self.handles, Timeout::At(deadline), false) {
None => Err(ReadyTimeoutError),
Some(index) => Ok(index),
}
Expand Down
38 changes: 32 additions & 6 deletions crossbeam-channel/src/select_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ macro_rules! crossbeam_channel_internal {
$cases:tt
) => {{
let _oper: $crate::SelectedOperation<'_> = {
let _oper = $crate::internal::select(&mut $sel);
let _oper = $crate::internal::select(&mut $sel, _IS_BIASED);

// Erase the lifetime so that `sel` can be dropped early even without NLL.
unsafe { ::std::mem::transmute(_oper) }
Expand Down Expand Up @@ -802,7 +802,7 @@ macro_rules! crossbeam_channel_internal {
$cases:tt
) => {{
let _oper: ::std::option::Option<$crate::SelectedOperation<'_>> = {
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout);
let _oper = $crate::internal::select_timeout(&mut $sel, $timeout, false);

// Erase the lifetime so that `sel` can be dropped early even without NLL.
unsafe { ::std::mem::transmute(_oper) }
Expand Down Expand Up @@ -985,7 +985,8 @@ macro_rules! crossbeam_channel_internal {
///
/// This macro allows you to define a set of channel operations, wait until any one of them becomes
/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
/// among them is selected.
/// among them is selected (i.e. the unbiased selection). Use `select_biased!` for the biased
/// selection.
///
/// It is also possible to define a `default` case that gets executed if none of the operations are
/// ready, either right away or for a certain duration of time.
Expand Down Expand Up @@ -1121,8 +1122,33 @@ macro_rules! crossbeam_channel_internal {
#[macro_export]
macro_rules! select {
($($tokens:tt)*) => {
$crate::crossbeam_channel_internal!(
$($tokens)*
)
{
const _IS_BIASED: bool = false;

$crate::crossbeam_channel_internal!(
$($tokens)*
)
}
};
}

/// Selects from a set of channel operations.
///
/// This macro allows you to define a list of channel operations, wait until any one of them
/// becomes ready, and finally execute it. If multiple operations are ready at the same time, the
/// operation nearest to the front of the list is always selected (i.e. the biased selection). Use
/// [`select!`] for the unbiased selection.
///
/// Otherwise, this macro's functionality is identical to [`select!`]. Refer to it for the syntax.
#[macro_export]
macro_rules! select_biased {
($($tokens:tt)*) => {
{
const _IS_BIASED: bool = true;

$crate::crossbeam_channel_internal!(
$($tokens)*
)
}
};
}
2 changes: 2 additions & 0 deletions crossbeam-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ macro_rules! select {
(
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+
) => ({
const _IS_BIASED: bool = false;

cc::crossbeam_channel_internal! {
$(
$meth(($rx).inner) -> res => {
Expand Down
35 changes: 34 additions & 1 deletion crossbeam-channel/tests/select_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ops::Deref;
use std::thread;
use std::time::{Duration, Instant};

use crossbeam_channel::{after, bounded, never, select, tick, unbounded};
use crossbeam_channel::{after, bounded, never, select, select_biased, tick, unbounded};
use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
use crossbeam_utils::thread::scope;

Expand Down Expand Up @@ -943,6 +943,39 @@ fn fairness_send() {
assert!(hits.iter().all(|x| *x >= COUNT / 4));
}

#[test]
fn unfairness() {
#[cfg(miri)]
const COUNT: usize = 100;
#[cfg(not(miri))]
const COUNT: usize = 10_000;

let (s1, r1) = unbounded::<()>();
let (s2, r2) = unbounded::<()>();

for _ in 0..COUNT {
s1.send(()).unwrap();
s2.send(()).unwrap();
}

let mut hits = [0usize; 2];
for _ in 0..COUNT {
select_biased! {
recv(r1) -> _ => hits[0] += 1,
recv(r2) -> _ => hits[1] += 1,
}
}
assert_eq!(hits, [COUNT, 0]);

for _ in 0..COUNT {
select_biased! {
recv(r1) -> _ => hits[0] += 1,
recv(r2) -> _ => hits[1] += 1,
}
}
assert_eq!(hits, [COUNT, COUNT]);
}

#[allow(clippy::or_fun_call, clippy::unnecessary_literal_unwrap)] // This is intentional.
#[test]
fn references() {
Expand Down

0 comments on commit 364bda6

Please sign in to comment.