Skip to content

Commit

Permalink
implement BodyReader for easy consumption of Body
Browse files Browse the repository at this point in the history
I need `serde_json::from_reader(req.body_mut().reader())`.
  • Loading branch information
dpc authored and ibraheemdev committed Sep 9, 2023
1 parent fdbe92b commit 42b8396
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::executor;

use core::fmt;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, debug_assert, io};

use futures_core::Stream;
use hyper::body::HttpBody;
Expand Down Expand Up @@ -109,6 +109,14 @@ impl Body {
{
Body(hyper::Body::wrap_stream(ReaderStream::new(reader)))
}

/// Create a [`BodyReader`] that implements [`std::io::Read`].
pub fn reader(&mut self) -> BodyReader<'_> {
BodyReader {
body: self,
prev_bytes: Bytes::new(),
}
}
}

impl<T> From<T> for Body
Expand All @@ -134,6 +142,48 @@ impl Iterator for Body {
}
}

/// Wraps [`Body`] and implements [`std::io::Read`]
pub struct BodyReader<'b> {
body: &'b mut Body,
prev_bytes: Bytes,
}

impl<'b> std::io::Read for BodyReader<'b> {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
loop {
if buf.is_empty() {
return Ok(written);
}

if !self.prev_bytes.is_empty() {
let chunk_size = cmp::min(buf.len(), self.prev_bytes.len());
let prev_bytes_rest = self.prev_bytes.split_to(chunk_size);
buf[..chunk_size].copy_from_slice(&self.prev_bytes[..chunk_size]);
self.prev_bytes = prev_bytes_rest;
buf = &mut buf[chunk_size..];
written += chunk_size;
continue;
}

if written != 0 {
// pulling from an interator can block, and we have something to return
// already, so return it
return Ok(written);
}

debug_assert!(self.prev_bytes.is_empty());
debug_assert!(written == 0);

self.prev_bytes = if let Some(next) = self.body.next() {
next?
} else {
return Ok(written);
}
}
}
}

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
Expand Down

0 comments on commit 42b8396

Please sign in to comment.