Skip to content

Commit

Permalink
poem-grpc: message can span multiple frame (#817)
Browse files Browse the repository at this point in the history
When a response was big enough to be split into multiple frames,
the response decoder was incorrectly checking the frame decoder
for an incomplete frame before receiving the remaining bytes.
  • Loading branch information
gwik committed May 18, 2024
1 parent 393ec48 commit e7de9e0
Showing 1 changed file with 58 additions and 1 deletion.
59 changes: 58 additions & 1 deletion poem-grpc/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ pub(crate) fn create_decode_response_body<T: Decoder>(
let message = decoder.decode(&data).map_err(Status::from_std_error)?;
yield message;
}
frame_decoder.check_incomplete()?;
} else if frame.is_trailers() {
frame_decoder.check_incomplete()?;
let headers = frame.into_trailers().unwrap();
status = Some(Status::from_headers(&headers)?
.ok_or_else(|| Status::new(Code::Internal)
Expand All @@ -209,3 +209,60 @@ pub(crate) fn create_decode_response_body<T: Decoder>(
}
}))
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use futures_util::TryStreamExt;
use http::HeaderMap;
use poem::Body;
use prost::Message;

use super::create_decode_response_body;
use crate::codec::{Codec, ProstCodec};

#[derive(Clone, PartialEq, Message)]
struct TestMsg {
#[prost(string, tag = 1)]
value: String,
}

#[tokio::test]
async fn msg_data_spans_multiple_frames() {
// Split and encoded message into multiple frames.
let msg = TestMsg {
value:
"A program is like a poem, you cannot write a poem without writing it. --- Dijkstra"
.into(),
};
let encoded = msg.encode_to_vec();
let len = encoded.len();

// Compression flag + u32 big endian size
let mut buffer = vec![0];
buffer.extend((len as u32).to_be_bytes());
buffer.extend(encoded);

// Split the data into multiple frames.
let (first_frame, second_frame) = buffer.split_at(len / 2);

let bytes_stream = futures_util::stream::iter(vec![
Ok::<_, std::io::Error>(Bytes::from(first_frame.to_vec())),
Ok(Bytes::from(second_frame.to_vec())),
]);
let body = Body::from_bytes_stream(bytes_stream);

let mut codec = ProstCodec::<TestMsg, TestMsg>::default();
let mut streaming =
create_decode_response_body(codec.decoder(), &HeaderMap::default(), body)
.expect("streaming");

let stream_msg = streaming
.try_next()
.await
.expect("msg")
.expect("one message");

assert_eq!(msg, stream_msg);
}
}

0 comments on commit e7de9e0

Please sign in to comment.