Skip to content

Commit 482a5f5

Browse files
committed
fix(lib): return an error instead of panic if execute fails
If executing an internal task fails, a new variant of `hyper::Error` is returned to the user, with improved messaging. If a non-critical task fails to spawn, it no longer panics, instead just logging a warning. Closes #1566
1 parent 27db8b0 commit 482a5f5

File tree

7 files changed

+63
-30
lines changed

7 files changed

+63
-30
lines changed

src/client/mod.rs

+27-14
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,20 @@ where C: Connect + Sync + 'static,
270270
.http2_only(pool_key.1 == Ver::Http2)
271271
.handshake(io)
272272
.and_then(move |(tx, conn)| {
273-
executor.execute(conn.map_err(|e| {
273+
let bg = executor.execute(conn.map_err(|e| {
274274
debug!("client connection error: {}", e)
275275
}));
276276

277+
// This task is critical, so an execute error
278+
// should be returned.
279+
if let Err(err) = bg {
280+
warn!("error spawning critical client task: {}", err);
281+
return Either::A(future::err(err));
282+
}
283+
277284
// Wait for 'conn' to ready up before we
278285
// declare this tx as usable
279-
tx.when_ready()
286+
Either::B(tx.when_ready())
280287
})
281288
.map(move |tx| {
282289
pool.pooled(connecting, PoolClient {
@@ -373,26 +380,32 @@ where C: Connect + Sync + 'static,
373380
} else if !res.body().is_end_stream() {
374381
let (delayed_tx, delayed_rx) = oneshot::channel();
375382
res.body_mut().delayed_eof(delayed_rx);
376-
executor.execute(
377-
future::poll_fn(move || {
378-
pooled.poll_ready()
379-
})
383+
let on_idle = future::poll_fn(move || {
384+
pooled.poll_ready()
385+
})
380386
.then(move |_| {
381387
// At this point, `pooled` is dropped, and had a chance
382388
// to insert into the pool (if conn was idle)
383389
drop(delayed_tx);
384390
Ok(())
385-
})
386-
);
391+
});
392+
393+
if let Err(err) = executor.execute(on_idle) {
394+
// This task isn't critical, so just log and ignore.
395+
warn!("error spawning task to insert idle connection: {}", err);
396+
}
387397
} else {
388398
// There's no body to delay, but the connection isn't
389399
// ready yet. Only re-insert when it's ready
390-
executor.execute(
391-
future::poll_fn(move || {
392-
pooled.poll_ready()
393-
})
394-
.then(|_| Ok(()))
395-
);
400+
let on_idle = future::poll_fn(move || {
401+
pooled.poll_ready()
402+
})
403+
.then(|_| Ok(()));
404+
405+
if let Err(err) = executor.execute(on_idle) {
406+
// This task isn't critical, so just log and ignore.
407+
warn!("error spawning task to insert idle connection: {}", err);
408+
}
396409
}
397410
Ok(res)
398411
});

src/client/pool.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -407,12 +407,16 @@ impl<T: Poolable> Connections<T> {
407407

408408
let start = Instant::now() + dur;
409409

410-
let interval = Interval::new(start, dur);
411-
self.exec.execute(IdleInterval {
412-
interval: interval,
410+
let interval = IdleInterval {
411+
interval: Interval::new(start, dur),
413412
pool: WeakOpt::downgrade(pool_ref),
414413
pool_drop_notifier: rx,
415-
});
414+
};
415+
416+
if let Err(err) = self.exec.execute(interval) {
417+
// This task isn't critical, so simply log and ignore.
418+
warn!("error spawning connection pool idle interval: {}", err);
419+
}
416420
}
417421
}
418422

src/common/exec.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@ pub(crate) enum Exec {
1313

1414

1515
impl Exec {
16-
pub(crate) fn execute<F>(&self, fut: F)
16+
pub(crate) fn execute<F>(&self, fut: F) -> ::Result<()>
1717
where
1818
F: Future<Item=(), Error=()> + Send + 'static,
1919
{
2020
match *self {
2121
Exec::Default => {
2222
#[cfg(feature = "runtime")]
2323
{
24-
::tokio_executor::spawn(fut)
24+
use ::tokio_executor::Executor;
25+
::tokio_executor::DefaultExecutor::current()
26+
.spawn(Box::new(fut))
27+
.map_err(|err| {
28+
warn!("executor error: {:?}", err);
29+
::Error::new_execute()
30+
})
2531
}
2632
#[cfg(not(feature = "runtime"))]
2733
{
@@ -30,10 +36,11 @@ impl Exec {
3036
}
3137
},
3238
Exec::Executor(ref e) => {
33-
let _ = e.execute(Box::new(fut))
39+
e.execute(Box::new(fut))
3440
.map_err(|err| {
35-
panic!("executor error: {:?}", err.kind());
36-
});
41+
warn!("executor error: {:?}", err.kind());
42+
::Error::new_execute()
43+
})
3744
},
3845
}
3946
}

src/error.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ pub(crate) enum Kind {
6767

6868
/// User polled for an upgrade, but low-level API is not using upgrades.
6969
ManualUpgrade,
70+
71+
/// Error trying to call `Executor::execute`.
72+
Execute,
7073
}
7174

7275
#[derive(Debug, PartialEq)]
@@ -114,7 +117,8 @@ impl Error {
114117
Kind::Closed |
115118
Kind::UnsupportedVersion |
116119
Kind::UnsupportedRequestMethod |
117-
Kind::NoUpgrade => true,
120+
Kind::NoUpgrade |
121+
Kind::Execute => true,
118122
_ => false,
119123
}
120124
}
@@ -130,7 +134,7 @@ impl Error {
130134
}
131135

132136
/// Returns the error's cause.
133-
///
137+
///
134138
/// This is identical to `Error::cause` except that it provides extra
135139
/// bounds required to be able to downcast the error.
136140
pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> {
@@ -244,6 +248,10 @@ impl Error {
244248
Error::new(Kind::Shutdown, Some(Box::new(cause)))
245249
}
246250

251+
pub(crate) fn new_execute() -> Error {
252+
Error::new(Kind::Execute, None)
253+
}
254+
247255
pub(crate) fn new_h2(cause: ::h2::Error) -> Error {
248256
Error::new(Kind::Http2, Some(Box::new(cause)))
249257
}
@@ -297,6 +305,7 @@ impl StdError for Error {
297305
Kind::UnsupportedRequestMethod => "request has unsupported HTTP method",
298306
Kind::NoUpgrade => "no upgrade available",
299307
Kind::ManualUpgrade => "upgrade expected but low level API in use",
308+
Kind::Execute => "executor failed to spawn task",
300309

301310
Kind::Io => "an IO error occurred",
302311
}

src/proto/h2/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ where
9494
}
9595
Err(Either::B((never, _))) => match never {},
9696
});
97-
self.executor.execute(fut);
97+
self.executor.execute(fut)?;
9898
State::Ready(request_tx, tx)
9999
},
100100
State::Ready(ref mut tx, ref conn_dropper) => {
@@ -129,7 +129,7 @@ where
129129
drop(conn_drop_ref);
130130
x
131131
});
132-
self.executor.execute(pipe);
132+
self.executor.execute(pipe)?;
133133
}
134134

135135
let fut = fut
@@ -148,7 +148,7 @@ where
148148
}
149149
Ok(())
150150
});
151-
self.executor.execute(fut);
151+
self.executor.execute(fut)?;
152152
continue;
153153
},
154154

src/proto/h2/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ where
132132
::Body::h2(stream, content_length)
133133
});
134134
let fut = H2Stream::new(service.call(req), respond);
135-
exec.execute(fut);
135+
exec.execute(fut)?;
136136
}
137137

138138
// no more incoming streams...

src/server/conn.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ where
641641
// flatten basically
642642
.and_then(|conn| conn.with_upgrades())
643643
.map_err(|err| debug!("conn error: {}", err));
644-
self.serve.protocol.exec.execute(fut);
644+
self.serve.protocol.exec.execute(fut)?;
645645
} else {
646646
return Ok(Async::Ready(()))
647647
}

0 commit comments

Comments
 (0)