Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webgpu: Use WGPU poller thread for poll_all_devices #32266

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 7 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ webpki-roots = "0.25"
webrender = { git = "https://github.com/servo/webrender", branch = "0.64", features = ["capture"] }
webrender_api = { git = "https://github.com/servo/webrender", branch = "0.64" }
webrender_traits = { path = "components/shared/webrender" }
wgpu-core = "0.20"
wgpu-types = "0.20"
wgpu-core = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
wgpu-types = { git = "https://github.com/gfx-rs/wgpu", rev = "d0a5e48aa7e84683114c3870051cc414ae92ac03" }
winapi = "0.3"
xi-unicode = "0.1.0"
xml5ever = "0.18"
Expand Down
3 changes: 2 additions & 1 deletion components/webgpu/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use wgpu_thread::WGPU;
pub use {wgpu_core as wgc, wgpu_types as wgt};

pub mod identity;
mod poll_thread;
mod wgpu_thread;

use std::borrow::Cow;
Expand Down Expand Up @@ -86,7 +87,7 @@ impl WebGPU {
.run();
})
{
warn!("Failed to spwan WGPU thread ({})", e);
warn!("Failed to spawn WGPU thread ({})", e);
return None;
}
Some((WebGPU(sender), script_recv))
Expand Down
126 changes: 126 additions & 0 deletions components/webgpu/poll_thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

//! Data and main loop of WGPU poll thread.
//!
//! This is roughly based on <https://github.com/LucentFlux/wgpu-async/blob/1322c7e3fcdfc1865a472c7bbbf0e2e06dcf4da8/src/wgpu_future.rs>

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;

use log::warn;

use crate::wgc::global::Global;

/// Polls devices while there is something to poll.
///
/// This objects corresponds to a thread that parks itself when there is no work,
/// waiting on it, and then calls `poll_all_devices` repeatedly to block.
///
/// The thread dies when this object is dropped, and all work in submission is done.
///
/// ## Example
/// ```no_run
/// let token = self.poller.token(); // create a new token
/// let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
/// drop(token); // drop token as closure has been fired
/// // ...
/// }));
/// let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
/// self.poller.wake(); // wake poller thread to actually poll
/// ```
#[derive(Debug)]
pub(crate) struct Poller {
/// The number of closures that still needs to be fired.
/// When this is 0, the thread can park itself.
work_count: Arc<AtomicUsize>,
/// True if thread should die after all work in submission is done
is_done: Arc<AtomicBool>,
/// Handle to the WGPU poller thread (to be used for unparking the thread)
handle: Option<JoinHandle<()>>,
}

#[inline]
fn poll_all_devices(global: &Arc<Global>, more_work: &mut bool, force_wait: bool) {
match global.poll_all_devices(force_wait) {
Ok(all_queue_empty) => *more_work = !all_queue_empty,
Err(e) => warn!("Poller thread got `{e}` on poll_all_devices."),
}
}

impl Poller {
pub(crate) fn new(global: Arc<Global>) -> Self {
let work_count = Arc::new(AtomicUsize::new(0));
let is_done = Arc::new(AtomicBool::new(false));
let work = work_count.clone();
let done = is_done.clone();
Self {
work_count,
is_done,
handle: Some(
std::thread::Builder::new()
.name("WGPU poller".into())
.spawn(move || {
while !done.load(Ordering::Acquire) {
let mut more_work = false;
// Do non-blocking poll unconditionally
// so every `ẁake` (even spurious) will do at least one poll.
// this is mostly useful for stuff that is deferred
// to maintain calls in wgpu (device resource destruction)
poll_all_devices(&global, &mut more_work, false);
while more_work || work.load(Ordering::Acquire) != 0 {
poll_all_devices(&global, &mut more_work, true);
}
std::thread::park(); //TODO: should we use timeout here
}
})
.expect("Spawning thread should not fail"),
),
}
}

/// Creates a token of work
pub(crate) fn token(&self) -> WorkToken {
let prev = self.work_count.fetch_add(1, Ordering::AcqRel);
debug_assert!(
prev < usize::MAX,
"cannot have more than `usize::MAX` outstanding operations on the GPU"
);
WorkToken {
work_count: Arc::clone(&self.work_count),
}
}

/// Wakes the poller thread to start polling.
pub(crate) fn wake(&self) {
self.handle
.as_ref()
.expect("Poller thread does not exist!")
.thread()
.unpark();
}
}

impl Drop for Poller {
fn drop(&mut self) {
sagudev marked this conversation as resolved.
Show resolved Hide resolved
self.is_done.store(true, Ordering::Release);

let handle = self.handle.take().expect("Poller dropped twice");
handle.thread().unpark();
handle.join().expect("Poller thread panicked");
}
}

/// RAII indicating that there is some work enqueued (closure to be fired),
/// while this token is held.
pub(crate) struct WorkToken {
work_count: Arc<AtomicUsize>,
}

impl Drop for WorkToken {
fn drop(&mut self) {
self.work_count.fetch_sub(1, Ordering::AcqRel);
}
}
42 changes: 21 additions & 21 deletions components/webgpu/wgpu_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::slice;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use arrayvec::ArrayVec;
use euclid::default::Size2D;
Expand All @@ -27,12 +26,12 @@ use wgc::{gfx_select, id};
use wgt::InstanceDescriptor;
pub use {wgpu_core as wgc, wgpu_types as wgt};

use crate::poll_thread::Poller;
use crate::{
ErrorScopeId, PresentationData, Transmute, WebGPU, WebGPUAdapter, WebGPUDevice, WebGPUMsg,
WebGPUOpResult, WebGPUQueue, WebGPURequest, WebGPUResponse,
};

const DEVICE_POLL_INTERVAL: Duration = Duration::from_millis(50);
pub const PRESENTATION_BUFFER_COUNT: usize = 10;

#[allow(clippy::upper_case_acronyms)] // Name of the library
Expand All @@ -51,7 +50,7 @@ pub(crate) struct WGPU {
webrender_document: DocumentId,
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
last_poll: Instant,
poller: Poller,
}

impl WGPU {
Expand All @@ -64,17 +63,19 @@ impl WGPU {
external_images: Arc<Mutex<WebrenderExternalImageRegistry>>,
wgpu_image_map: Arc<Mutex<HashMap<u64, PresentationData>>>,
) -> Self {
let global = Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
));
WGPU {
poller: Poller::new(Arc::clone(&global)),
receiver,
sender,
script_sender,
global: Arc::new(wgc::global::Global::new(
"wgpu-core",
InstanceDescriptor {
backends: wgt::Backends::PRIMARY,
..Default::default()
},
)),
global,
adapters: Vec::new(),
devices: HashMap::new(),
_invalid_adapters: Vec::new(),
Expand All @@ -83,21 +84,12 @@ impl WGPU {
webrender_document,
external_images,
wgpu_image_map,
last_poll: Instant::now(),
}
}

pub(crate) fn run(&mut self) {
loop {
let diff = DEVICE_POLL_INTERVAL.checked_sub(self.last_poll.elapsed());
if diff.is_none() {
let _ = self.global.poll_all_devices(false);
self.last_poll = Instant::now();
}
if let Ok((scope_id, msg)) = self
.receiver
.try_recv_timeout(diff.unwrap_or(DEVICE_POLL_INTERVAL))
{
if let Ok((scope_id, msg)) = self.receiver.recv() {
match msg {
WebGPURequest::BufferMapAsync {
sender,
Expand All @@ -109,7 +101,9 @@ impl WGPU {
} => {
let glob = Arc::clone(&self.global);
let resp_sender = sender.clone();
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
Expand Down Expand Up @@ -151,6 +145,7 @@ impl WGPU {
size,
operation
));
self.poller.wake();
if let Err(ref e) = result {
if let Err(w) = sender.send(Some(Err(format!("{:?}", e)))) {
warn!("Failed to send BufferMapAsync Response ({:?})", w);
Expand Down Expand Up @@ -818,7 +813,9 @@ impl WGPU {
let wgpu_image_map = Arc::clone(&self.wgpu_image_map);
let webrender_api = Arc::clone(&self.webrender_api);
let webrender_document = self.webrender_document;
let token = self.poller.token();
let callback = BufferMapCallback::from_rust(Box::from(move |result| {
drop(token);
match result {
Ok(()) => {
let global = &glob;
Expand Down Expand Up @@ -866,6 +863,7 @@ impl WGPU {
};
let _ = gfx_select!(buffer_id
=> global.buffer_map_async(buffer_id, 0, Some(buffer_size), map_op));
self.poller.wake();
},
WebGPURequest::UnmapBuffer {
buffer_id,
Expand Down Expand Up @@ -928,14 +926,16 @@ impl WGPU {
},
WebGPURequest::QueueOnSubmittedWorkDone { sender, queue_id } => {
let global = &self.global;

let token = self.poller.token();
let callback = SubmittedWorkDoneClosure::from_rust(Box::from(move || {
drop(token);
if let Err(e) = sender.send(Some(Ok(WebGPUResponse::SubmittedWorkDone)))
{
warn!("Could not send SubmittedWorkDone Response ({})", e);
}
}));
let result = gfx_select!(queue_id => global.queue_on_submitted_work_done(queue_id, callback));
self.poller.wake();
self.send_result(queue_id.transmute(), scope_id, result);
},
WebGPURequest::DropTexture(id) => {
Expand Down