Skip to content

Commit fad42ac

Browse files
authored
feat(lib): Upgrade to Tokio 1.0 (#2369)
Closes #2370
1 parent dad5c87 commit fad42ac

File tree

14 files changed

+71
-146
lines changed

14 files changed

+71
-146
lines changed

Cargo.toml

+7-7
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@ include = [
2222
]
2323

2424
[dependencies]
25-
bytes = "0.6"
25+
bytes = "1"
2626
futures-core = { version = "0.3", default-features = false }
2727
futures-channel = "0.3"
2828
futures-util = { version = "0.3", default-features = false }
2929
http = "0.2"
30-
http-body = { git = "https://github.com/hyperium/http-body" }
30+
http-body = "0.4"
3131
httpdate = "0.3"
3232
httparse = "1.0"
33-
h2 = { git = "https://github.com/hyperium/h2", optional = true }
33+
h2 = { version = "0.3", optional = true }
3434
itoa = "0.4.1"
3535
tracing = { version = "0.1", default-features = false, features = ["std"] }
3636
pin-project = "1.0"
3737
tower-service = "0.3"
38-
tokio = { version = "0.3.4", features = ["sync", "stream"] }
38+
tokio = { version = "1", features = ["sync"] }
3939
want = "0.3"
4040

4141
# Optional
@@ -51,7 +51,7 @@ spmc = "0.3"
5151
serde = "1.0"
5252
serde_derive = "1.0"
5353
serde_json = "1.0"
54-
tokio = { version = "0.3", features = [
54+
tokio = { version = "1", features = [
5555
"fs",
5656
"macros",
5757
"io-std",
@@ -62,8 +62,8 @@ tokio = { version = "0.3", features = [
6262
"time",
6363
"test-util",
6464
] }
65-
tokio-test = "0.3"
66-
tokio-util = { version = "0.5", features = ["codec"] }
65+
tokio-test = "0.4"
66+
tokio-util = { version = "0.6", features = ["codec"] }
6767
tower-util = "0.3"
6868
url = "1.0"
6969

src/body/to_bytes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ where
2323
let second = if let Some(buf) = body.data().await {
2424
buf?
2525
} else {
26-
return Ok(first.copy_to_bytes(first.bytes().len()));
26+
return Ok(first.copy_to_bytes(first.remaining()));
2727
};
2828

2929
// With more than 1 buf, we gotta flatten into a Vec first.

src/client/connect/http.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -667,8 +667,11 @@ impl ConnectingTcp<'_> {
667667
let fallback_fut = fallback.remote.connect(self.config);
668668
futures_util::pin_mut!(fallback_fut);
669669

670+
let fallback_delay = fallback.delay;
671+
futures_util::pin_mut!(fallback_delay);
672+
670673
let (result, future) =
671-
match futures_util::future::select(preferred_fut, fallback.delay).await {
674+
match futures_util::future::select(preferred_fut, fallback_delay).await {
672675
Either::Left((result, _fallback_delay)) => {
673676
(result, Either::Right(fallback_fut))
674677
}

src/client/dispatch.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[cfg(feature = "http2")]
22
use std::future::Future;
33

4-
use tokio::stream::Stream;
4+
use futures_util::FutureExt;
55
use tokio::sync::{mpsc, oneshot};
66

77
use crate::common::{task, Pin, Poll};
@@ -150,8 +150,8 @@ impl<T, U> Receiver<T, U> {
150150
self: Pin<&mut Self>,
151151
cx: &mut task::Context<'_>,
152152
) -> Poll<Option<(T, Callback<T, U>)>> {
153-
let this = self.project();
154-
match this.inner.poll_next(cx) {
153+
let mut this = self.project();
154+
match this.inner.poll_recv(cx) {
155155
Poll::Ready(item) => {
156156
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
157157
}
@@ -170,9 +170,9 @@ impl<T, U> Receiver<T, U> {
170170

171171
#[cfg(feature = "http1")]
172172
pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
173-
match self.inner.try_recv() {
174-
Ok(mut env) => env.0.take(),
175-
Err(_) => None,
173+
match self.inner.recv().now_or_never() {
174+
Some(Some(mut env)) => env.0.take(),
175+
_ => None,
176176
}
177177
}
178178
}

src/client/pool.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,6 @@ impl<T: Poolable + 'static> Future for IdleTask<T> {
731731
type Output = ();
732732

733733
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
734-
use tokio::stream::Stream;
735734
let mut this = self.project();
736735
loop {
737736
match this.pool_drop_notifier.as_mut().poll(cx) {
@@ -743,7 +742,7 @@ impl<T: Poolable + 'static> Future for IdleTask<T> {
743742
}
744743
}
745744

746-
ready!(this.interval.as_mut().poll_next(cx));
745+
ready!(this.interval.as_mut().poll_tick(cx));
747746

748747
if let Some(inner) = this.pool.upgrade() {
749748
if let Ok(mut inner) = inner.lock() {

src/common/buf.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ impl<T: Buf> Buf for BufList<T> {
3434
}
3535

3636
#[inline]
37-
fn bytes(&self) -> &[u8] {
38-
self.bufs.front().map(Buf::bytes).unwrap_or_default()
37+
fn chunk(&self) -> &[u8] {
38+
self.bufs.front().map(Buf::chunk).unwrap_or_default()
3939
}
4040

4141
#[inline]
@@ -57,13 +57,13 @@ impl<T: Buf> Buf for BufList<T> {
5757
}
5858

5959
#[inline]
60-
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
60+
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
6161
if dst.is_empty() {
6262
return 0;
6363
}
6464
let mut vecs = 0;
6565
for buf in &self.bufs {
66-
vecs += buf.bytes_vectored(&mut dst[vecs..]);
66+
vecs += buf.chunks_vectored(&mut dst[vecs..]);
6767
if vecs == dst.len() {
6868
break;
6969
}

src/common/io/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
mod rewind;
22

33
pub(crate) use self::rewind::Rewind;
4-
pub(crate) const MAX_WRITEV_BUFS: usize = 64;

src/proto/h1/encode.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,12 @@ where
229229
}
230230

231231
#[inline]
232-
fn bytes(&self) -> &[u8] {
232+
fn chunk(&self) -> &[u8] {
233233
match self.kind {
234-
BufKind::Exact(ref b) => b.bytes(),
235-
BufKind::Limited(ref b) => b.bytes(),
236-
BufKind::Chunked(ref b) => b.bytes(),
237-
BufKind::ChunkedEnd(ref b) => b.bytes(),
234+
BufKind::Exact(ref b) => b.chunk(),
235+
BufKind::Limited(ref b) => b.chunk(),
236+
BufKind::Chunked(ref b) => b.chunk(),
237+
BufKind::ChunkedEnd(ref b) => b.chunk(),
238238
}
239239
}
240240

@@ -249,12 +249,12 @@ where
249249
}
250250

251251
#[inline]
252-
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
252+
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
253253
match self.kind {
254-
BufKind::Exact(ref b) => b.bytes_vectored(dst),
255-
BufKind::Limited(ref b) => b.bytes_vectored(dst),
256-
BufKind::Chunked(ref b) => b.bytes_vectored(dst),
257-
BufKind::ChunkedEnd(ref b) => b.bytes_vectored(dst),
254+
BufKind::Exact(ref b) => b.chunks_vectored(dst),
255+
BufKind::Limited(ref b) => b.chunks_vectored(dst),
256+
BufKind::Chunked(ref b) => b.chunks_vectored(dst),
257+
BufKind::ChunkedEnd(ref b) => b.chunks_vectored(dst),
258258
}
259259
}
260260
}
@@ -295,7 +295,7 @@ impl Buf for ChunkSize {
295295
}
296296

297297
#[inline]
298-
fn bytes(&self) -> &[u8] {
298+
fn chunk(&self) -> &[u8] {
299299
&self.bytes[self.pos.into()..self.len.into()]
300300
}
301301

src/proto/h1/io.rs

+13-12
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ where
186186
self.read_buf.reserve(next);
187187
}
188188

189-
let dst = self.read_buf.bytes_mut();
189+
let dst = self.read_buf.chunk_mut();
190190
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
191191
let mut buf = ReadBuf::uninit(dst);
192192
match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
@@ -231,10 +231,11 @@ where
231231
return self.poll_flush_flattened(cx);
232232
}
233233

234+
const MAX_WRITEV_BUFS: usize = 64;
234235
loop {
235236
let n = {
236-
let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS];
237-
let len = self.write_buf.bytes_vectored(&mut iovs);
237+
let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
238+
let len = self.write_buf.chunks_vectored(&mut iovs);
238239
ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
239240
};
240241
// TODO(eliza): we have to do this manually because
@@ -262,7 +263,7 @@ where
262263
/// that skips some bookkeeping around using multiple buffers.
263264
fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
264265
loop {
265-
let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.bytes()))?;
266+
let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
266267
debug!("flushed {} bytes", n);
267268
self.write_buf.headers.advance(n);
268269
if self.write_buf.headers.remaining() == 0 {
@@ -433,7 +434,7 @@ impl<T: AsRef<[u8]>> Buf for Cursor<T> {
433434
}
434435

435436
#[inline]
436-
fn bytes(&self) -> &[u8] {
437+
fn chunk(&self) -> &[u8] {
437438
&self.bytes.as_ref()[self.pos..]
438439
}
439440

@@ -487,7 +488,7 @@ where
487488
//but accomplishes the same result.
488489
loop {
489490
let adv = {
490-
let slice = buf.bytes();
491+
let slice = buf.chunk();
491492
if slice.is_empty() {
492493
return;
493494
}
@@ -534,12 +535,12 @@ impl<B: Buf> Buf for WriteBuf<B> {
534535
}
535536

536537
#[inline]
537-
fn bytes(&self) -> &[u8] {
538-
let headers = self.headers.bytes();
538+
fn chunk(&self) -> &[u8] {
539+
let headers = self.headers.chunk();
539540
if !headers.is_empty() {
540541
headers
541542
} else {
542-
self.queue.bytes()
543+
self.queue.chunk()
543544
}
544545
}
545546

@@ -559,9 +560,9 @@ impl<B: Buf> Buf for WriteBuf<B> {
559560
}
560561

561562
#[inline]
562-
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
563-
let n = self.headers.bytes_vectored(dst);
564-
self.queue.bytes_vectored(&mut dst[n..]) + n
563+
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
564+
let n = self.headers.chunks_vectored(dst);
565+
self.queue.chunks_vectored(&mut dst[n..]) + n
565566
}
566567
}
567568

src/proto/h2/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ impl<B: Buf> Buf for SendBuf<B> {
257257
}
258258

259259
#[inline]
260-
fn bytes(&self) -> &[u8] {
261-
self.0.as_ref().map(|b| b.bytes()).unwrap_or(&[])
260+
fn chunk(&self) -> &[u8] {
261+
self.0.as_ref().map(|b| b.chunk()).unwrap_or(&[])
262262
}
263263

264264
#[inline]
@@ -268,7 +268,7 @@ impl<B: Buf> Buf for SendBuf<B> {
268268
}
269269
}
270270

271-
fn bytes_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
272-
self.0.as_ref().map(|b| b.bytes_vectored(dst)).unwrap_or(0)
271+
fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
272+
self.0.as_ref().map(|b| b.chunks_vectored(dst)).unwrap_or(0)
273273
}
274274
}

src/proto/h2/ping.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
6060
interval,
6161
timeout: config.keep_alive_timeout,
6262
while_idle: config.keep_alive_while_idle,
63-
timer: tokio::time::sleep(interval),
63+
timer: Box::pin(tokio::time::sleep(interval)),
6464
state: KeepAliveState::Init,
6565
});
6666

@@ -156,7 +156,7 @@ struct KeepAlive {
156156
while_idle: bool,
157157

158158
state: KeepAliveState,
159-
timer: Sleep,
159+
timer: Pin<Box<Sleep>>,
160160
}
161161

162162
#[cfg(feature = "runtime")]
@@ -441,7 +441,7 @@ impl KeepAlive {
441441

442442
self.state = KeepAliveState::Scheduled;
443443
let interval = shared.last_read_at() + self.interval;
444-
self.timer.reset(interval);
444+
self.timer.as_mut().reset(interval);
445445
}
446446
KeepAliveState::PingSent => {
447447
if shared.is_ping_sent() {
@@ -450,7 +450,7 @@ impl KeepAlive {
450450

451451
self.state = KeepAliveState::Scheduled;
452452
let interval = shared.last_read_at() + self.interval;
453-
self.timer.reset(interval);
453+
self.timer.as_mut().reset(interval);
454454
}
455455
KeepAliveState::Scheduled => (),
456456
}
@@ -472,7 +472,7 @@ impl KeepAlive {
472472
shared.send_ping();
473473
self.state = KeepAliveState::PingSent;
474474
let timeout = Instant::now() + self.timeout;
475-
self.timer.reset(timeout);
475+
self.timer.as_mut().reset(timeout);
476476
}
477477
KeepAliveState::Init | KeepAliveState::PingSent => (),
478478
}

src/server/tcp.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct AddrIncoming {
1919
sleep_on_errors: bool,
2020
tcp_keepalive_timeout: Option<Duration>,
2121
tcp_nodelay: bool,
22-
timeout: Option<Sleep>,
22+
timeout: Option<Pin<Box<Sleep>>>,
2323
}
2424

2525
impl AddrIncoming {
@@ -160,9 +160,9 @@ impl AddrIncoming {
160160
error!("accept error: {}", e);
161161

162162
// Sleep 1s.
163-
let mut timeout = tokio::time::sleep(Duration::from_secs(1));
163+
let mut timeout = Box::pin(tokio::time::sleep(Duration::from_secs(1)));
164164

165-
match Pin::new(&mut timeout).poll(cx) {
165+
match timeout.as_mut().poll(cx) {
166166
Poll::Ready(()) => {
167167
// Wow, it's been a second already? Ok then...
168168
continue;
@@ -263,7 +263,7 @@ mod addr_stream {
263263
pub fn poll_peek(
264264
&mut self,
265265
cx: &mut task::Context<'_>,
266-
buf: &mut [u8],
266+
buf: &mut tokio::io::ReadBuf<'_>,
267267
) -> Poll<io::Result<usize>> {
268268
self.inner.poll_peek(cx, buf)
269269
}

0 commit comments

Comments
 (0)