Skip to content

Commit d67e49f

Browse files
committed
feat(client): change Connect trait into an alias for Service
The `Connect` trait is now essentially an alias for `Service<Destination>`, with a blanket implementation as such, and is sealed. Closes #1902 BREAKING CHANGE: Any manual implementations of `Connect` must instead implement `tower::Service<Destination>`.
1 parent 4f27439 commit d67e49f

File tree

6 files changed

+256
-113
lines changed

6 files changed

+256
-113
lines changed

src/client/connect/http.rs

+99-64
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::error::Error as StdError;
33
use std::io;
44
use std::mem;
55
use std::net::{IpAddr, SocketAddr};
6+
use std::sync::Arc;
67
use std::time::Duration;
78

89
use http::uri::{Scheme, Uri};
@@ -13,7 +14,7 @@ use tokio_net::tcp::TcpStream;
1314
use tokio_timer::{Delay, Timeout};
1415

1516
use crate::common::{Future, Pin, Poll, task};
16-
use super::{Connect, Connected, Destination};
17+
use super::{Connected, Destination};
1718
use super::dns::{self, GaiResolver, Resolve};
1819
#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
1920

@@ -30,17 +31,8 @@ type ConnectFuture = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>
3031
/// transport information such as the remote socket address used.
3132
#[derive(Clone)]
3233
pub struct HttpConnector<R = GaiResolver> {
33-
enforce_http: bool,
34-
handle: Option<Handle>,
35-
connect_timeout: Option<Duration>,
36-
happy_eyeballs_timeout: Option<Duration>,
37-
keep_alive_timeout: Option<Duration>,
38-
local_address: Option<IpAddr>,
39-
nodelay: bool,
34+
config: Arc<Config>,
4035
resolver: R,
41-
reuse_address: bool,
42-
send_buffer_size: Option<usize>,
43-
recv_buffer_size: Option<usize>,
4436
}
4537

4638
/// Extra information about the transport when an HttpConnector is used.
@@ -76,6 +68,22 @@ pub struct HttpInfo {
7668
remote_addr: SocketAddr,
7769
}
7870

71+
#[derive(Clone)]
72+
struct Config {
73+
connect_timeout: Option<Duration>,
74+
enforce_http: bool,
75+
handle: Option<Handle>,
76+
happy_eyeballs_timeout: Option<Duration>,
77+
keep_alive_timeout: Option<Duration>,
78+
local_address: Option<IpAddr>,
79+
nodelay: bool,
80+
reuse_address: bool,
81+
send_buffer_size: Option<usize>,
82+
recv_buffer_size: Option<usize>,
83+
}
84+
85+
// ===== impl HttpConnector =====
86+
7987
impl HttpConnector {
8088
/// Construct a new HttpConnector.
8189
pub fn new() -> HttpConnector {
@@ -100,17 +108,19 @@ impl<R> HttpConnector<R> {
100108
/// Takes a `Resolve` to handle DNS lookups.
101109
pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
102110
HttpConnector {
103-
enforce_http: true,
104-
handle: None,
105-
connect_timeout: None,
106-
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
107-
keep_alive_timeout: None,
108-
local_address: None,
109-
nodelay: false,
111+
config: Arc::new(Config {
112+
connect_timeout: None,
113+
enforce_http: true,
114+
handle: None,
115+
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
116+
keep_alive_timeout: None,
117+
local_address: None,
118+
nodelay: false,
119+
reuse_address: false,
120+
send_buffer_size: None,
121+
recv_buffer_size: None,
122+
}),
110123
resolver,
111-
reuse_address: false,
112-
send_buffer_size: None,
113-
recv_buffer_size: None,
114124
}
115125
}
116126

@@ -119,15 +129,15 @@ impl<R> HttpConnector<R> {
119129
/// Enabled by default.
120130
#[inline]
121131
pub fn enforce_http(&mut self, is_enforced: bool) {
122-
self.enforce_http = is_enforced;
132+
self.config_mut().enforce_http = is_enforced;
123133
}
124134

125135
/// Set a handle to a `Reactor` to register connections to.
126136
///
127137
/// If `None`, the implicit default reactor will be used.
128138
#[inline]
129139
pub fn set_reactor(&mut self, handle: Option<Handle>) {
130-
self.handle = handle;
140+
self.config_mut().handle = handle;
131141
}
132142

133143
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
@@ -137,27 +147,27 @@ impl<R> HttpConnector<R> {
137147
/// Default is `None`.
138148
#[inline]
139149
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
140-
self.keep_alive_timeout = dur;
150+
self.config_mut().keep_alive_timeout = dur;
141151
}
142152

143153
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
144154
///
145155
/// Default is `false`.
146156
#[inline]
147157
pub fn set_nodelay(&mut self, nodelay: bool) {
148-
self.nodelay = nodelay;
158+
self.config_mut().nodelay = nodelay;
149159
}
150160

151161
/// Sets the value of the SO_SNDBUF option on the socket.
152162
#[inline]
153163
pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
154-
self.send_buffer_size = size;
164+
self.config_mut().send_buffer_size = size;
155165
}
156166

157167
/// Sets the value of the SO_RCVBUF option on the socket.
158168
#[inline]
159169
pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
160-
self.recv_buffer_size = size;
170+
self.config_mut().recv_buffer_size = size;
161171
}
162172

163173
/// Set that all sockets are bound to the configured address before connection.
@@ -167,7 +177,7 @@ impl<R> HttpConnector<R> {
167177
/// Default is `None`.
168178
#[inline]
169179
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
170-
self.local_address = addr;
180+
self.config_mut().local_address = addr;
171181
}
172182

173183
/// Set the connect timeout.
@@ -178,7 +188,7 @@ impl<R> HttpConnector<R> {
178188
/// Default is `None`.
179189
#[inline]
180190
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
181-
self.connect_timeout = dur;
191+
self.config_mut().connect_timeout = dur;
182192
}
183193

184194
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
@@ -195,17 +205,26 @@ impl<R> HttpConnector<R> {
195205
/// [RFC 6555]: https://tools.ietf.org/html/rfc6555
196206
#[inline]
197207
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
198-
self.happy_eyeballs_timeout = dur;
208+
self.config_mut().happy_eyeballs_timeout = dur;
199209
}
200210

201211
/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
202212
///
203213
/// Default is `false`.
204214
#[inline]
205215
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
206-
self.reuse_address = reuse_address;
216+
self.config_mut().reuse_address = reuse_address;
207217
self
208218
}
219+
220+
// private
221+
222+
fn config_mut(&mut self) -> &mut Config {
223+
// If the are HttpConnector clones, this will clone the inner
224+
// config. So mutating the config won't ever affect previous
225+
// clones.
226+
Arc::make_mut(&mut self.config)
227+
}
209228
}
210229

211230
// R: Debug required for now to allow adding it to debug output later...
@@ -216,51 +235,59 @@ impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
216235
}
217236
}
218237

219-
impl<R> Connect for HttpConnector<R>
238+
impl<R> tower_service::Service<Destination> for HttpConnector<R>
220239
where
221240
R: Resolve + Clone + Send + Sync,
222241
R::Future: Send,
223242
{
224-
type Transport = TcpStream;
243+
type Response = (TcpStream, Connected);
225244
type Error = io::Error;
226245
type Future = HttpConnecting<R>;
227246

228-
fn connect(&self, dst: Destination) -> Self::Future {
247+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
248+
// For now, always ready.
249+
// TODO: When `Resolve` becomes an alias for `Service`, check
250+
// the resolver's readiness.
251+
drop(cx);
252+
Poll::Ready(Ok(()))
253+
}
254+
255+
fn call(&mut self, dst: Destination) -> Self::Future {
229256
trace!(
230257
"Http::connect; scheme={}, host={}, port={:?}",
231258
dst.scheme(),
232259
dst.host(),
233260
dst.port(),
234261
);
235262

236-
if self.enforce_http {
263+
if self.config.enforce_http {
237264
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
238-
return invalid_url(InvalidUrl::NotHttp, &self.handle);
265+
return invalid_url(InvalidUrl::NotHttp, &self.config.handle);
239266
}
240267
} else if dst.uri.scheme_part().is_none() {
241-
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
268+
return invalid_url(InvalidUrl::MissingScheme, &self.config.handle);
242269
}
243270

244271
let host = match dst.uri.host() {
245272
Some(s) => s,
246-
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
273+
None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle),
247274
};
248275
let port = match dst.uri.port_part() {
249276
Some(port) => port.as_u16(),
250277
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
251278
};
252279

253280
HttpConnecting {
254-
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
255-
handle: self.handle.clone(),
256-
connect_timeout: self.connect_timeout,
257-
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
258-
keep_alive_timeout: self.keep_alive_timeout,
259-
nodelay: self.nodelay,
281+
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
282+
handle: self.config.handle.clone(),
283+
connect_timeout: self.config.connect_timeout,
284+
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
285+
keep_alive_timeout: self.config.keep_alive_timeout,
286+
nodelay: self.config.nodelay,
260287
port,
261-
reuse_address: self.reuse_address,
262-
send_buffer_size: self.send_buffer_size,
263-
recv_buffer_size: self.recv_buffer_size,
288+
reuse_address: self.config.reuse_address,
289+
send_buffer_size: self.config.send_buffer_size,
290+
recv_buffer_size: self.config.recv_buffer_size,
264291
}
265292
}
266293
}
@@ -289,34 +316,34 @@ where
289316
dst.port(),
290317
);
291318

292-
if self.enforce_http {
319+
if self.config.enforce_http {
293320
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
294-
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
321+
return invalid_url::<R>(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed();
295322
}
296323
} else if dst.uri.scheme_part().is_none() {
297-
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
324+
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed();
298325
}
299326

300327
let host = match dst.uri.host() {
301328
Some(s) => s,
302-
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
329+
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(),
303330
};
304331
let port = match dst.uri.port_part() {
305332
Some(port) => port.as_u16(),
306333
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
307334
};
308335

309336
let fut = HttpConnecting {
310-
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
311-
handle: self.handle.clone(),
312-
connect_timeout: self.connect_timeout,
313-
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
314-
keep_alive_timeout: self.keep_alive_timeout,
315-
nodelay: self.nodelay,
337+
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
338+
handle: self.config.handle.clone(),
339+
connect_timeout: self.config.connect_timeout,
340+
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
341+
keep_alive_timeout: self.config.keep_alive_timeout,
342+
nodelay: self.config.nodelay,
316343
port,
317-
reuse_address: self.reuse_address,
318-
send_buffer_size: self.send_buffer_size,
319-
recv_buffer_size: self.recv_buffer_size,
344+
reuse_address: self.config.reuse_address,
345+
send_buffer_size: self.config.send_buffer_size,
346+
recv_buffer_size: self.config.recv_buffer_size,
320347
};
321348

322349
fut.map_ok(|(s, _)| s).boxed()
@@ -671,7 +698,15 @@ mod tests {
671698
use tokio::runtime::current_thread::Runtime;
672699
use tokio_net::driver::Handle;
673700

674-
use super::{Connect, Destination, HttpConnector};
701+
use super::{Connected, Destination, HttpConnector};
702+
use super::super::sealed::Connect;
703+
704+
async fn connect<C>(connector: C, dst: Destination) -> Result<(C::Transport, Connected), C::Error>
705+
where
706+
C: Connect,
707+
{
708+
connector.connect(super::super::sealed::Internal, dst).await
709+
}
675710

676711
#[test]
677712
fn test_errors_missing_authority() {
@@ -684,7 +719,7 @@ mod tests {
684719

685720
rt.block_on(async {
686721
assert_eq!(
687-
connector.connect(dst).await.unwrap_err().kind(),
722+
connect(connector, dst).await.unwrap_err().kind(),
688723
io::ErrorKind::InvalidInput,
689724
);
690725
})
@@ -701,7 +736,7 @@ mod tests {
701736

702737
rt.block_on(async {
703738
assert_eq!(
704-
connector.connect(dst).await.unwrap_err().kind(),
739+
connect(connector, dst).await.unwrap_err().kind(),
705740
io::ErrorKind::InvalidInput,
706741
);
707742
})
@@ -718,7 +753,7 @@ mod tests {
718753

719754
rt.block_on(async {
720755
assert_eq!(
721-
connector.connect(dst).await.unwrap_err().kind(),
756+
connect(connector, dst).await.unwrap_err().kind(),
722757
io::ErrorKind::InvalidInput,
723758
);
724759
});

0 commit comments

Comments
 (0)