Skip to content

Commit

Permalink
:) (#233)
Browse files Browse the repository at this point in the history
* :)

* insert subscription oneshot in execute

* add astro dist gitkeep

* unstable fix
  • Loading branch information
Brendonovich committed Oct 10, 2023
1 parent 799687f commit 899a185
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ default = ["typescript"]
typescript = ["rspc-core/typescript", "specta/typescript"]
tracing = ["rspc-core/tracing", "dep:tracing"]

unstable = [] # APIs where one line of code can blow up your whole app
unstable = ["rspc-core/unstable"] # APIs where one line of code can blow up your whole app

[dependencies]
rspc-core = { path = "./crates/core" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
default = []
typescript = ["specta/typescript"]
tracing = ["dep:tracing"]
unstable = []

[dependencies]
specta = { workspace = true, features = ["typescript"] } # TODO: `typescript` should be required
Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/exec/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ where

match res {
ExecutorResult::Task(task) => {
let task_id = task.id;
self.streams.insert(task);
self.subscriptions.shutdown(task_id);
}
ExecutorResult::Future(fut) => {
self.streams.insert(fut.into());
Expand Down Expand Up @@ -92,8 +90,7 @@ pub async fn run_connection<

loop {
if !batch.is_empty() {
let batch = batch.drain(..batch.len()).collect::<Vec<_>>();
if let Err(_err) = socket.send(batch).await {
if let Err(_err) = socket.send(std::mem::take(&mut batch)).await {
#[cfg(feature = "tracing")]
tracing::error!("Error sending message to websocket: {}", _err);
}
Expand Down Expand Up @@ -159,7 +156,7 @@ pub async fn run_connection<
StreamYield::Item(resp) => batch.push(resp),
StreamYield::Finished(f) => {
if let Some(stream) = f.take(Pin::new(&mut conn.streams)) {
conn.subscriptions._internal_remove(stream.id);
conn.subscriptions.remove(stream.id);
}
}
}
Expand All @@ -171,7 +168,7 @@ pub async fn run_connection<
}
StreamYield::Finished(f) => {
if let Some(stream) = f.take(Pin::new(&mut conn.streams)) {
conn.subscriptions._internal_remove(stream.id);
conn.subscriptions.remove(stream.id);
}
}
}
Expand Down
23 changes: 16 additions & 7 deletions crates/core/src/exec/execute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{pin::Pin, sync::Arc};

use futures::channel::oneshot;

use crate::{
body::Body,
error::ExecError,
Expand All @@ -11,7 +13,7 @@ use crate::{
Router,
};

use super::{task, SubscriptionMap};
use super::SubscriptionMap;

/// TODO
///
Expand Down Expand Up @@ -60,13 +62,20 @@ impl<TCtx: Send + 'static> Router<TCtx> {
Some(subs) if subs.contains_key(data.id) => {
Err(ExecError::ErrSubscriptionDuplicateId)
}
Some(_) => match get_subscription(self, ctx, data) {
Some(subs) => match get_subscription(self, ctx, data) {
None => Err(ExecError::OperationNotFound),
Some(stream) => Ok(ExecutorResult::Task(Task {
id,
stream,
status: task::Status::ShouldBePolled { done: false },
})),
Some(stream) => {
let (tx, rx) = oneshot::channel();

subs.insert(id, tx);

Ok(ExecutorResult::Task(Task {
id,
stream,
done: false,
shutdown_rx: Some(rx),
}))
}
},
}
.unwrap_or_else(|e| {
Expand Down
14 changes: 12 additions & 2 deletions crates/core/src/exec/subscription_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ impl SubscriptionMap {
}
}

pub fn insert(&mut self, id: u32, tx: oneshot::Sender<()>) {
self.map.insert(id, tx);
}

// We remove but don't shutdown. This should be used when we know the subscription is shutdown.
pub(crate) fn _internal_remove(&mut self, id: u32) {
self.map.remove(&id);
pub(crate) fn remove(&mut self, id: u32) {
if let Some(tx) = self.map.remove(&id) {
#[cfg(debug_assertions)]
#[allow(clippy::panic)]
if !tx.is_canceled() {
panic!("Subscription was not shutdown before being removed!");
}
};
}
}
42 changes: 20 additions & 22 deletions crates/core/src/exec/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fmt, pin::Pin, task::Poll};

use futures::{ready, Stream};
use futures::{channel::oneshot, ready, stream::FusedStream, FutureExt, Stream};

use crate::body::Body;
use crate::exec;
Expand All @@ -9,20 +9,15 @@ use super::{arc_ref::ArcRef, request_future::RequestFuture};

// TODO: Should this be called `Task` or `StreamWrapper`? Will depend on it's final form.

// TODO: Replace with FusedStream in dev if possible???
pub enum Status {
ShouldBePolled { done: bool },
DoNotPoll,
}

// TODO: docs
pub struct Task {
pub(crate) id: u32,
// You will notice this is a `Stream` not a `Future` like would be implied by the struct.
// rspc's whole middleware system only uses `Stream`'s cause it makes life easier so we change to & from a `Future` at the start/end.
pub(crate) stream: ArcRef<Pin<Box<dyn Body + Send>>>,
// Mark when the stream is done. This means `self.reference` returned `None` but we still had to yield the complete message so we haven't returned `None` yet.
pub(crate) status: Status,
pub(crate) done: bool,
pub(crate) shutdown_rx: Option<oneshot::Receiver<()>>,
}

impl fmt::Debug for Task {
Expand All @@ -40,17 +35,13 @@ impl Stream for Task {
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match &self.status {
#[allow(clippy::panic)]
Status::DoNotPoll => {
#[cfg(debug_assertions)]
panic!("`StreamWrapper` polled after completion")
}
Status::ShouldBePolled { done } => {
if *done {
self.status = Status::DoNotPoll;
return Poll::Ready(None);
}
if self.done {
return Poll::Ready(None);
}

if let Some(shutdown_rx) = self.shutdown_rx.as_mut() {
if shutdown_rx.poll_unpin(cx).is_ready() {
self.done = true;
}
}

Expand All @@ -64,8 +55,8 @@ impl Stream for Task {
},
None => {
let id = self.id;
cx.waker().wake_by_ref(); // We want the stream to be called again so we can return `None` and close it
self.status = Status::ShouldBePolled { done: true };
self.done = true;
cx.waker().wake_by_ref();
exec::Response {
id,
inner: exec::ResponseInner::Complete,
Expand All @@ -80,12 +71,19 @@ impl Stream for Task {
}
}

impl FusedStream for Task {
fn is_terminated(&self) -> bool {
self.done
}
}

impl From<RequestFuture> for Task {
fn from(value: RequestFuture) -> Self {
Self {
id: value.id,
stream: value.stream,
status: Status::ShouldBePolled { done: false },
done: false,
shutdown_rx: None,
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions crates/core/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,7 @@ use specta::{
TypeMap,
};

use crate::{
error::ExportError,
internal::ProcedureDef,
middleware::ProcedureKind,
procedure_store::{ProcedureTodo, ProceduresDef},
router_builder::ProcedureMap,
};
use crate::{error::ExportError, procedure_store::ProceduresDef, router_builder::ProcedureMap};

// TODO: Break this out into it's own file
/// ExportConfig is used to configure how rspc will export your types.
Expand Down Expand Up @@ -198,18 +192,24 @@ where
}
}

// TODO: Plz try and get rid of these. They are escape hatches for Spacedrive's invalidation system that is dearly in need of a makeover.
#[cfg(feature = "unstable")]
impl<TCtx> Router<TCtx> {
pub fn queries(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.queries
}
mod unstable {
use std::collections::BTreeMap;

pub fn mutations(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.mutations
}
use crate::internal::ProcedureTodo;

// TODO: Plz try and get rid of these. They are escape hatches for Spacedrive's invalidation system that is dearly in need of a makeover.
impl<TCtx> super::Router<TCtx> {
pub fn queries(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.queries
}

pub fn mutations(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.mutations
}

pub fn subscriptions(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.subscriptions
pub fn subscriptions(&self) -> &BTreeMap<String, ProcedureTodo<TCtx>> {
&self.subscriptions
}
}
}
1 change: 1 addition & 0 deletions examples/astro/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# build output
dist/
!dist/.gitkeep
# generated types
.astro/

Expand Down

0 comments on commit 899a185

Please sign in to comment.