Skip to content

Commit 1e72a8a

Browse files
committed
feat(client): add a Connection Pool
This adds a connection pool to the Client that is used by default. It accepts any other NetworkConnector, and simply acts as a NetworkConnector itself. Other Pools can exist by simply providing a custom NetworkConnector. This Pool is only used by default if you also use the default connector, which is `HttpConnector`. If you wish to use the Pool with a custom connector, you'll need to create the Pool with your custom connector, and then pass that pool to the Client::with_connector. This also adds a method to `NetworkStream`, `close`, which can be used to know when the Stream should be put down, because a server requested that the connection close instead of be kept alive. Closes #363 Closes #41
1 parent 362044c commit 1e72a8a

File tree

7 files changed

+281
-15
lines changed

7 files changed

+281
-15
lines changed

src/client/mod.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,17 @@ use status::StatusClass::Redirection;
4646
use {Url, HttpResult};
4747
use HttpError::HttpUriError;
4848

49+
pub use self::pool::Pool;
4950
pub use self::request::Request;
5051
pub use self::response::Response;
5152

53+
pub mod pool;
5254
pub mod request;
5355
pub mod response;
5456

5557
/// A Client to use additional features with Requests.
5658
///
57-
/// Clients can handle things such as: redirect policy.
59+
/// Clients can handle things such as: redirect policy, connection pooling.
5860
pub struct Client {
5961
connector: Connector,
6062
redirect_policy: RedirectPolicy,
@@ -64,7 +66,12 @@ impl Client {
6466

6567
/// Create a new Client.
6668
pub fn new() -> Client {
67-
Client::with_connector(HttpConnector(None))
69+
Client::with_pool_config(Default::default())
70+
}
71+
72+
/// Create a new Client with a configured Pool Config.
73+
pub fn with_pool_config(config: pool::Config) -> Client {
74+
Client::with_connector(Pool::new(config))
6875
}
6976

7077
/// Create a new client with a specific connector.
@@ -78,7 +85,10 @@ impl Client {
7885

7986
/// Set the SSL verifier callback for use with OpenSSL.
8087
pub fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
81-
self.connector = with_connector(HttpConnector(Some(verifier)));
88+
self.connector = with_connector(Pool::with_connector(
89+
Default::default(),
90+
HttpConnector(Some(verifier))
91+
));
8292
}
8393

8494
/// Set the RedirectPolicy.

src/client/pool.rs

+227
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
//! Client Connection Pooling
2+
use std::borrow::ToOwned;
3+
use std::collections::HashMap;
4+
use std::io::{self, Read, Write};
5+
use std::net::{SocketAddr, Shutdown};
6+
use std::sync::{Arc, Mutex};
7+
8+
use net::{NetworkConnector, NetworkStream, HttpConnector};
9+
10+
/// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`.
11+
pub struct Pool<C: NetworkConnector> {
12+
connector: C,
13+
inner: Arc<Mutex<PoolImpl<<C as NetworkConnector>::Stream>>>
14+
}
15+
16+
/// Config options for the `Pool`.
17+
#[derive(Debug)]
18+
pub struct Config {
19+
/// The maximum idle connections *per host*.
20+
pub max_idle: usize,
21+
}
22+
23+
impl Default for Config {
24+
#[inline]
25+
fn default() -> Config {
26+
Config {
27+
max_idle: 5,
28+
}
29+
}
30+
}
31+
32+
#[derive(Debug)]
33+
struct PoolImpl<S> {
34+
conns: HashMap<Key, Vec<S>>,
35+
config: Config,
36+
}
37+
38+
type Key = (String, u16, Scheme);
39+
40+
fn key<T: Into<Scheme>>(host: &str, port: u16, scheme: T) -> Key {
41+
(host.to_owned(), port, scheme.into())
42+
}
43+
44+
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
45+
enum Scheme {
46+
Http,
47+
Https,
48+
Other(String)
49+
}
50+
51+
impl<'a> From<&'a str> for Scheme {
52+
fn from(s: &'a str) -> Scheme {
53+
match s {
54+
"http" => Scheme::Http,
55+
"https" => Scheme::Https,
56+
s => Scheme::Other(String::from(s))
57+
}
58+
}
59+
}
60+
61+
impl Pool<HttpConnector> {
62+
/// Creates a `Pool` with an `HttpConnector`.
63+
#[inline]
64+
pub fn new(config: Config) -> Pool<HttpConnector> {
65+
Pool::with_connector(config, HttpConnector(None))
66+
}
67+
}
68+
69+
impl<C: NetworkConnector> Pool<C> {
70+
/// Creates a `Pool` with a specified `NetworkConnector`.
71+
#[inline]
72+
pub fn with_connector(config: Config, connector: C) -> Pool<C> {
73+
Pool {
74+
connector: connector,
75+
inner: Arc::new(Mutex::new(PoolImpl {
76+
conns: HashMap::new(),
77+
config: config,
78+
}))
79+
}
80+
}
81+
82+
/// Clear all idle connections from the Pool, closing them.
83+
#[inline]
84+
pub fn clear_idle(&mut self) {
85+
self.inner.lock().unwrap().conns.clear();
86+
}
87+
}
88+
89+
impl<S> PoolImpl<S> {
90+
fn reuse(&mut self, key: Key, conn: S) {
91+
trace!("reuse {:?}", key);
92+
let conns = self.conns.entry(key).or_insert(vec![]);
93+
if conns.len() < self.config.max_idle {
94+
conns.push(conn);
95+
}
96+
}
97+
}
98+
99+
impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector for Pool<C> {
100+
type Stream = PooledStream<S>;
101+
fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result<PooledStream<S>> {
102+
let key = key(host, port, scheme);
103+
let mut locked = self.inner.lock().unwrap();
104+
let mut should_remove = false;
105+
let conn = match locked.conns.get_mut(&key) {
106+
Some(ref mut vec) => {
107+
should_remove = vec.len() == 1;
108+
vec.pop().unwrap()
109+
}
110+
_ => try!(self.connector.connect(host, port, scheme))
111+
};
112+
if should_remove {
113+
locked.conns.remove(&key);
114+
}
115+
Ok(PooledStream {
116+
inner: Some((key, conn)),
117+
is_closed: false,
118+
is_drained: false,
119+
pool: self.inner.clone()
120+
})
121+
}
122+
}
123+
124+
/// A Stream that will try to be returned to the Pool when dropped.
125+
pub struct PooledStream<S> {
126+
inner: Option<(Key, S)>,
127+
is_closed: bool,
128+
is_drained: bool,
129+
pool: Arc<Mutex<PoolImpl<S>>>
130+
}
131+
132+
impl<S: NetworkStream> Read for PooledStream<S> {
133+
#[inline]
134+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
135+
match self.inner.as_mut().unwrap().1.read(buf) {
136+
Ok(0) => {
137+
self.is_drained = true;
138+
Ok(0)
139+
}
140+
r => r
141+
}
142+
}
143+
}
144+
145+
impl<S: NetworkStream> Write for PooledStream<S> {
146+
#[inline]
147+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
148+
self.inner.as_mut().unwrap().1.write(buf)
149+
}
150+
151+
#[inline]
152+
fn flush(&mut self) -> io::Result<()> {
153+
self.inner.as_mut().unwrap().1.flush()
154+
}
155+
}
156+
157+
impl<S: NetworkStream> NetworkStream for PooledStream<S> {
158+
#[inline]
159+
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
160+
self.inner.as_mut().unwrap().1.peer_addr()
161+
}
162+
163+
#[inline]
164+
fn close(&mut self, how: Shutdown) -> io::Result<()> {
165+
self.is_closed = true;
166+
self.inner.as_mut().unwrap().1.close(how)
167+
}
168+
}
169+
170+
impl<S> Drop for PooledStream<S> {
171+
fn drop(&mut self) {
172+
trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained);
173+
if !self.is_closed && self.is_drained {
174+
self.inner.take().map(|(key, conn)| {
175+
if let Ok(mut pool) = self.pool.lock() {
176+
pool.reuse(key, conn);
177+
}
178+
// else poisoned, give up
179+
});
180+
}
181+
}
182+
}
183+
184+
#[cfg(test)]
185+
mod tests {
186+
use std::net::Shutdown;
187+
use mock::MockConnector;
188+
use net::{NetworkConnector, NetworkStream};
189+
190+
use super::{Pool, key};
191+
192+
macro_rules! mocked {
193+
() => ({
194+
Pool::with_connector(Default::default(), MockConnector)
195+
})
196+
}
197+
198+
#[test]
199+
fn test_connect_and_drop() {
200+
let mut pool = mocked!();
201+
let key = key("127.0.0.1", 3000, "http");
202+
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true;
203+
{
204+
let locked = pool.inner.lock().unwrap();
205+
assert_eq!(locked.conns.len(), 1);
206+
assert_eq!(locked.conns.get(&key).unwrap().len(), 1);
207+
}
208+
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused
209+
{
210+
let locked = pool.inner.lock().unwrap();
211+
assert_eq!(locked.conns.len(), 1);
212+
assert_eq!(locked.conns.get(&key).unwrap().len(), 1);
213+
}
214+
}
215+
216+
#[test]
217+
fn test_closed() {
218+
let mut pool = mocked!();
219+
let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap();
220+
stream.close(Shutdown::Both).unwrap();
221+
drop(stream);
222+
let locked = pool.inner.lock().unwrap();
223+
assert_eq!(locked.conns.len(), 0);
224+
}
225+
226+
227+
}

src/client/request.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
//! Client Requests
22
use std::marker::PhantomData;
33
use std::io::{self, Write, BufWriter};
4+
use std::net::Shutdown;
45

56
use url::Url;
67

78
use method::{self, Method};
89
use header::Headers;
910
use header::{self, Host};
1011
use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming};
11-
use http::{HttpWriter, LINE_ENDING};
12+
use http::{self, HttpWriter, LINE_ENDING};
1213
use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter, EmptyWriter};
1314
use version;
1415
use HttpResult;
@@ -154,7 +155,10 @@ impl Request<Streaming> {
154155
///
155156
/// Consumes the Request.
156157
pub fn send(self) -> HttpResult<Response> {
157-
let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes
158+
let mut raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes
159+
if !http::should_keep_alive(self.version, &self.headers) {
160+
try!(raw.close(Shutdown::Write));
161+
}
158162
Response::new(raw)
159163
}
160164
}

src/client/response.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Client Responses
22
use std::io::{self, Read};
33
use std::marker::PhantomData;
4+
use std::net::Shutdown;
45

56
use buffer::BufReader;
67
use header;
@@ -42,6 +43,10 @@ impl Response {
4243
debug!("version={:?}, status={:?}", head.version, status);
4344
debug!("headers={:?}", headers);
4445

46+
if !http::should_keep_alive(head.version, &headers) {
47+
try!(stream.get_mut().close(Shutdown::Write));
48+
}
49+
4550
let body = if headers.has::<TransferEncoding>() {
4651
match headers.get::<TransferEncoding>() {
4752
Some(&TransferEncoding(ref codings)) => {

src/http.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::fmt;
77
use httparse;
88

99
use buffer::BufReader;
10-
use header::Headers;
10+
use header::{Headers, Connection};
11+
use header::ConnectionOption::{Close, KeepAlive};
1112
use method::Method;
1213
use status::StatusCode;
1314
use uri::RequestUri;
@@ -443,6 +444,15 @@ pub const LINE_ENDING: &'static str = "\r\n";
443444
#[derive(Clone, PartialEq, Debug)]
444445
pub struct RawStatus(pub u16, pub Cow<'static, str>);
445446

447+
/// Checks if a connection should be kept alive.
448+
pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool {
449+
match (version, headers.get::<Connection>()) {
450+
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
451+
(Http11, Some(conn)) if conn.contains(&Close) => false,
452+
_ => true
453+
}
454+
}
455+
446456
#[cfg(test)]
447457
mod tests {
448458
use std::io::{self, Write};

src/net.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use std::any::{Any, TypeId};
33
use std::fmt;
44
use std::io::{self, Read, Write};
5-
use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener};
5+
use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
66
use std::mem;
77
use std::path::Path;
88
use std::sync::Arc;
@@ -57,6 +57,10 @@ impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> {
5757
pub trait NetworkStream: Read + Write + Any + Send + Typeable {
5858
/// Get the remote address of the underlying connection.
5959
fn peer_addr(&mut self) -> io::Result<SocketAddr>;
60+
/// This will be called when Stream should no longer be kept alive.
61+
fn close(&mut self, _how: Shutdown) -> io::Result<()> {
62+
Ok(())
63+
}
6064
}
6165

6266
/// A connector creates a NetworkStream.
@@ -123,6 +127,7 @@ impl NetworkStream + Send {
123127
}
124128

125129
/// If the underlying type is T, extract it.
130+
#[inline]
126131
pub fn downcast<T: Any>(self: Box<NetworkStream + Send>)
127132
-> Result<Box<T>, Box<NetworkStream + Send>> {
128133
if self.is::<T>() {
@@ -277,12 +282,21 @@ impl Write for HttpStream {
277282
}
278283

279284
impl NetworkStream for HttpStream {
285+
#[inline]
280286
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
281287
match *self {
282288
HttpStream::Http(ref mut inner) => inner.0.peer_addr(),
283289
HttpStream::Https(ref mut inner) => inner.get_mut().0.peer_addr()
284290
}
285291
}
292+
293+
#[inline]
294+
fn close(&mut self, how: Shutdown) -> io::Result<()> {
295+
match *self {
296+
HttpStream::Http(ref mut inner) => inner.0.shutdown(how),
297+
HttpStream::Https(ref mut inner) => inner.get_mut().0.shutdown(how)
298+
}
299+
}
286300
}
287301

288302
/// A connector that will produce HttpStreams.

0 commit comments

Comments
 (0)