Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,16 @@ impl Body {
ping.record_data(bytes.len());
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => match e.reason() {
// These reasons should cause stop of body reading, but nor fail it.
// The same logic as for `AsyncRead for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None),
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
},
Some(Err(e)) => {
if let Some(h2::Reason::NO_ERROR) = e.reason() {
// As mentioned in RFC 7540 Section 8.1, a RST_STREAM with NO_ERROR
// indicates an early response, and should cause the body reading
// to stop, but not fail it:
Poll::Ready(None)
} else {
Poll::Ready(Some(Err(crate::Error::new_body(e))))
}
}
None => Poll::Ready(None),
},

Expand Down Expand Up @@ -389,7 +393,16 @@ impl HttpBody for Body {
ping.record_non_data();
Poll::Ready(Ok(t))
}
Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
Err(e) => {
if let Some(h2::Reason::NO_ERROR) = e.reason() {
// Same as above, a RST_STREAM with NO_ERROR indicates an early
// response, and should cause reading the trailers to stop, but
// not fail it:
Poll::Ready(Ok(None))
} else {
Poll::Ready(Err(crate::Error::new_h2(e)))
}
}
},
Kind::Chan {
ref mut trailers_rx,
Expand Down
114 changes: 111 additions & 3 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3213,9 +3213,10 @@ mod conn {
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
async fn http2_responds_before_consuming_request_body_no_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::HttpBody;
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -3260,11 +3261,118 @@ mod conn {
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let body = concat(resp.into_body())
let mut body = resp.into_body();

let data = body
.data()
.await
.expect("response has body")
.expect("get response body with no error");
assert_eq!(data.as_ref(), b"No bread for you!");

let trailers = body
.trailers()
.await
.expect("get response trailers with no error");
assert!(trailers.is_none());
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body_with_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::{HttpBody, SizeHint};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

/// An `HttpBody` implementation whose `is_end_stream()` will
/// return `true` after sending trailers.
pub struct TrailersBody(Option<HeaderMap>);

impl HttpBody for TrailersBody {
type Data = bytes::Bytes;
type Error = hyper::Error;

fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(self.0.take()))
}

fn is_end_stream(&self) -> bool {
self.0.is_none()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(0)
}
}

let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr = listener.local_addr().unwrap();

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
hyper::server::conn::Http::new()
.http2_only(true)
.serve_connection(
sock,
service_fn(|_req| async move {
let mut trailers = HeaderMap::new();
trailers.insert("grpc", HeaderValue::from_static("0"));
let body = TrailersBody(Some(trailers));
Ok::<_, hyper::Error>(http::Response::new(body))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::Builder::new()
.http2_only(true)
.handshake::<_, Body>(io)
.await
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (tx, body) = hyper::Body::channel();
let req = Request::post("/a").body(body).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = resp.into_body();

let data = body.data().await;
assert!(data.is_none());

let trailers = body
.trailers()
.await
.expect("get response trailers with no error")
.expect("trailers HeaderMaps is present");

assert_eq!(body.as_ref(), b"No bread for you!");
assert_eq!(trailers.len(), 1);
assert_eq!(trailers.get("grpc").unwrap(), "0");
drop(tx);
}

#[tokio::test]
Expand Down