Skip to content

Commit 1207c2b

Browse files
committed
feat(client): introduce lower-level Connection API
Closes #1449
1 parent 0786ea1 commit 1207c2b

19 files changed

+1813
-791
lines changed

benches/end_to_end.rs

+21-23
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@ extern crate tokio_core;
88

99
use std::net::SocketAddr;
1010

11-
use futures::{future, Future, Stream};
11+
use futures::{Future, Stream};
1212
use tokio_core::reactor::{Core, Handle};
1313
use tokio_core::net::TcpListener;
1414

1515
use hyper::client;
1616
use hyper::header::{ContentLength, ContentType};
1717
use hyper::Method;
18-
use hyper::server::{self, Service};
1918

2019

2120
#[bench]
@@ -42,13 +41,15 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
4241

4342
#[bench]
4443
fn post_one_at_a_time(b: &mut test::Bencher) {
44+
extern crate pretty_env_logger;
45+
let _ = pretty_env_logger::try_init();
4546
let mut core = Core::new().unwrap();
4647
let handle = core.handle();
4748
let addr = spawn_hello(&handle);
4849

4950
let client = hyper::Client::new(&handle);
5051

51-
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
52+
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
5253

5354
let post = "foo bar baz quux";
5455
b.bytes = 180 * 2 + post.len() as u64 + PHRASE.len() as u64;
@@ -69,35 +70,32 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
6970

7071
static PHRASE: &'static [u8] = include_bytes!("../CHANGELOG.md"); //b"Hello, World!";
7172

72-
#[derive(Clone, Copy)]
73-
struct Hello;
74-
75-
impl Service for Hello {
76-
type Request = server::Request;
77-
type Response = server::Response;
78-
type Error = hyper::Error;
79-
type Future = future::FutureResult<Self::Response, hyper::Error>;
80-
fn call(&self, _req: Self::Request) -> Self::Future {
81-
future::ok(
82-
server::Response::new()
83-
.with_header(ContentLength(PHRASE.len() as u64))
84-
.with_header(ContentType::plaintext())
85-
.with_body(PHRASE)
86-
)
87-
}
88-
89-
}
90-
9173
fn spawn_hello(handle: &Handle) -> SocketAddr {
74+
use hyper::server::{const_service, service_fn, NewService, Request, Response};
9275
let addr = "127.0.0.1:0".parse().unwrap();
9376
let listener = TcpListener::bind(&addr, handle).unwrap();
9477
let addr = listener.local_addr().unwrap();
9578

9679
let handle2 = handle.clone();
9780
let http = hyper::server::Http::<hyper::Chunk>::new();
81+
82+
let service = const_service(service_fn(|req: Request| {
83+
req.body()
84+
.concat2()
85+
.map(|_| {
86+
Response::<hyper::Body>::new()
87+
.with_header(ContentLength(PHRASE.len() as u64))
88+
.with_header(ContentType::plaintext())
89+
.with_body(PHRASE)
90+
})
91+
}));
92+
93+
let mut conns = 0;
9894
handle.spawn(listener.incoming().for_each(move |(socket, _addr)| {
95+
conns += 1;
96+
assert_eq!(conns, 1, "should only need 1 connection");
9997
handle2.spawn(
100-
http.serve_connection(socket, Hello)
98+
http.serve_connection(socket, service.new_service()?)
10199
.map(|_| ())
102100
.map_err(|_| ())
103101
);

src/client/cancel.rs

-154
This file was deleted.

0 commit comments

Comments
 (0)