|
1 | 1 | use std::fmt;
|
| 2 | +#[cfg(feature = "server")] |
| 3 | +use std::future::Future; |
2 | 4 | use std::io;
|
3 | 5 | use std::marker::{PhantomData, Unpin};
|
4 | 6 | use std::pin::Pin;
|
5 | 7 | use std::task::{Context, Poll};
|
6 | 8 | #[cfg(feature = "server")]
|
7 |
| -use std::time::Duration; |
| 9 | +use std::time::{Duration, Instant}; |
8 | 10 |
|
9 | 11 | use crate::rt::{Read, Write};
|
10 | 12 | use bytes::{Buf, Bytes};
|
@@ -209,33 +211,67 @@ where
|
209 | 211 | debug_assert!(self.can_read_head());
|
210 | 212 | trace!("Conn::read_head");
|
211 | 213 |
|
212 |
| - let msg = match ready!(self.io.parse::<T>( |
| 214 | + #[cfg(feature = "server")] |
| 215 | + if !self.state.h1_header_read_timeout_running { |
| 216 | + if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout { |
| 217 | + let deadline = Instant::now() + h1_header_read_timeout; |
| 218 | + self.state.h1_header_read_timeout_running = true; |
| 219 | + match self.state.h1_header_read_timeout_fut { |
| 220 | + Some(ref mut h1_header_read_timeout_fut) => { |
| 221 | + trace!("resetting h1 header read timeout timer"); |
| 222 | + self.state.timer.reset(h1_header_read_timeout_fut, deadline); |
| 223 | + } |
| 224 | + None => { |
| 225 | + trace!("setting h1 header read timeout timer"); |
| 226 | + self.state.h1_header_read_timeout_fut = |
| 227 | + Some(self.state.timer.sleep_until(deadline)); |
| 228 | + } |
| 229 | + } |
| 230 | + } |
| 231 | + } |
| 232 | + |
| 233 | + let msg = match self.io.parse::<T>( |
213 | 234 | cx,
|
214 | 235 | ParseContext {
|
215 | 236 | cached_headers: &mut self.state.cached_headers,
|
216 | 237 | req_method: &mut self.state.method,
|
217 | 238 | h1_parser_config: self.state.h1_parser_config.clone(),
|
218 | 239 | h1_max_headers: self.state.h1_max_headers,
|
219 |
| - #[cfg(feature = "server")] |
220 |
| - h1_header_read_timeout: self.state.h1_header_read_timeout, |
221 |
| - #[cfg(feature = "server")] |
222 |
| - h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, |
223 |
| - #[cfg(feature = "server")] |
224 |
| - h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, |
225 |
| - #[cfg(feature = "server")] |
226 |
| - timer: self.state.timer.clone(), |
227 | 240 | preserve_header_case: self.state.preserve_header_case,
|
228 | 241 | #[cfg(feature = "ffi")]
|
229 | 242 | preserve_header_order: self.state.preserve_header_order,
|
230 | 243 | h09_responses: self.state.h09_responses,
|
231 | 244 | #[cfg(feature = "ffi")]
|
232 | 245 | on_informational: &mut self.state.on_informational,
|
| 246 | + }, |
| 247 | + ) { |
| 248 | + Poll::Ready(Ok(msg)) => msg, |
| 249 | + Poll::Ready(Err(e)) => return self.on_read_head_error(e), |
| 250 | + Poll::Pending => { |
| 251 | + #[cfg(feature = "server")] |
| 252 | + if self.state.h1_header_read_timeout_running { |
| 253 | + if let Some(ref mut h1_header_read_timeout_fut) = |
| 254 | + self.state.h1_header_read_timeout_fut |
| 255 | + { |
| 256 | + if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() { |
| 257 | + self.state.h1_header_read_timeout_running = false; |
| 258 | + |
| 259 | + warn!("read header from client timeout"); |
| 260 | + return Poll::Ready(Some(Err(crate::Error::new_header_timeout()))); |
| 261 | + } |
| 262 | + } |
| 263 | + } |
| 264 | + |
| 265 | + return Poll::Pending; |
233 | 266 | }
|
234 |
| - )) { |
235 |
| - Ok(msg) => msg, |
236 |
| - Err(e) => return self.on_read_head_error(e), |
237 | 267 | };
|
238 | 268 |
|
| 269 | + #[cfg(feature = "server")] |
| 270 | + { |
| 271 | + self.state.h1_header_read_timeout_running = false; |
| 272 | + self.state.h1_header_read_timeout_fut = None; |
| 273 | + } |
| 274 | + |
239 | 275 | // Note: don't deconstruct `msg` into local variables, it appears
|
240 | 276 | // the optimizer doesn't remove the extra copies.
|
241 | 277 |
|
|
0 commit comments