Skip to content

Commit

Permalink
feat: tcp_info stats for HTTP mode
Browse files Browse the repository at this point in the history
  • Loading branch information
3Hren committed Feb 4, 2025
1 parent 3fa2cc6 commit 8a873ca
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dwd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pin-project-lite = "0.2"
http-body-util = "0.1"
anyhow = "1"
pnet = "0.35"
libc = "0.2"

[target.'cfg(target_os = "linux")'.dependencies]
netlink-packet-core = { version = "0.7" }
Expand Down
146 changes: 130 additions & 16 deletions dwd/src/engine/http/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use core::{
sync::atomic::{AtomicBool, AtomicU64, Ordering},
time::Duration,
};
use std::{sync::Arc, thread, time::Instant};
use std::{
os::fd::{AsFd, AsRawFd, RawFd},
sync::Arc,
thread,
time::Instant,
};

use anyhow::Error;
use bytes::Bytes;
Expand Down Expand Up @@ -213,8 +218,7 @@ struct CoroWorker<B, D> {
/// Data to send.
data: D,
/// Current TCP socket.
// stream: Option<TcpStream>,
stream: Option<SendRequest<Empty<Bytes>>>,
stream: Option<TcpStream>,
/// The number of requests after which the socket will be recreated.
requests_per_sock: u64,
/// Number of requests done for the currently active socket.
Expand Down Expand Up @@ -280,8 +284,6 @@ where
}
};

// self.buf.clear();

match self.perform_request(&mut sock).await {
Ok(c) if (200..300).contains(&c) => self.stat.on_2xx(now),
Ok(c) if (300..400).contains(&c) => self.stat.on_3xx(now),
Expand All @@ -296,33 +298,40 @@ where

self.requests_per_sock_done += 1;
if self.requests_per_sock_done < self.requests_per_sock {
if self.requests_per_sock_done % 32 == 0 {
self.update_stats(&mut sock);
}
self.stream = Some(sock);
} else {
self.requests_per_sock_done = 0;
self.update_stats(&mut sock);
}
}

#[inline]
async fn perform_request(&mut self, stream: &mut SendRequest<Empty<Bytes>>) -> Result<u16, Error> {
async fn perform_request(&mut self, stream: &mut TcpStream) -> Result<u16, Error> {
let req = self.data.next();
let mut resp = stream.send_request(req.clone()).await?;
let mut resp = stream.sender.send_request(req.clone()).await?;
self.stat.on_requests(1);
// self.stat.on_send(self.request.len() as u64);
// self.stat.on_recv(self.buf.len() as u64);

let code = resp.status().as_u16();
while let Some(next) = resp.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
self.stat.on_recv(chunk.len() as u64);
}
next?;
}

Ok(code)
}

#[inline]
async fn curr_sock(&mut self) -> Result<SendRequest<Empty<Bytes>>, Error> {
fn update_stats(&mut self, stream: &mut TcpStream) {
let tcpi = get_tcp_info(stream.fd);
self.stat.on_send(tcpi.bytes_acked - stream.tcp_info.bytes_acked);
self.stat.on_recv(tcpi.bytes_received - stream.tcp_info.bytes_received);
stream.tcp_info = tcpi;
}

#[inline]
async fn curr_sock(&mut self) -> Result<TcpStream, Error> {
let stream = match self.stream.take() {
Some(stream) => stream,
None => self.reconnect().await?,
Expand All @@ -332,13 +341,14 @@ where
}

#[inline]
async fn reconnect(&mut self) -> Result<SendRequest<Empty<Bytes>>, Error> {
async fn reconnect(&mut self) -> Result<TcpStream, Error> {
let addr = self.bind.next();
let sock = match addr {
SocketAddr::V4(..) => TcpSocket::new_v4()?,
SocketAddr::V6(..) => TcpSocket::new_v6()?,
};
sock.bind(*addr)?;
let fd = sock.as_fd().as_raw_fd();

let stream = sock.connect(self.addr).await?;
if self.tcp_no_delay {
Expand All @@ -357,6 +367,110 @@ where
}
});

Ok(sender)
let stream = TcpStream::new(fd, sender);

Ok(stream)
}
}

#[derive(Debug)]
struct TcpStream {
fd: RawFd,
sender: SendRequest<Empty<Bytes>>,
tcp_info: TcpInfo,
}

impl TcpStream {
pub fn new(fd: RawFd, sender: SendRequest<Empty<Bytes>>) -> Self {
Self {
fd,
sender,
tcp_info: TcpInfo::default(),
}
}
}

#[derive(Debug, Clone, Default)]
struct TcpInfo {
pub bytes_acked: u64,
pub bytes_received: u64,
}

#[cfg(target_os = "linux")]
fn get_tcp_info(fd: RawFd) -> TcpInfo {
#[repr(C)]
#[derive(Debug, Clone, Default)]
struct tcp_info {
pub tcpi_state: u8,
pub tcpi_ca_state: u8,
pub tcpi_retransmits: u8,
pub tcpi_probes: u8,
pub tcpi_backoff: u8,
pub tcpi_options: u8,
/// This contains the bitfields `tcpi_snd_wscale` and
/// `tcpi_rcv_wscale`. Each is 4 bits.
pub tcpi_snd_rcv_wscale: u8,
pub tcpi_rto: u32,
pub tcpi_ato: u32,
pub tcpi_snd_mss: u32,
pub tcpi_rcv_mss: u32,
pub tcpi_unacked: u32,
pub tcpi_sacked: u32,
pub tcpi_lost: u32,
pub tcpi_retrans: u32,
pub tcpi_fackets: u32,
pub tcpi_last_data_sent: u32,
pub tcpi_last_ack_sent: u32,
pub tcpi_last_data_recv: u32,
pub tcpi_last_ack_recv: u32,
pub tcpi_pmtu: u32,
pub tcpi_rcv_ssthresh: u32,
pub tcpi_rtt: u32,
pub tcpi_rttvar: u32,
pub tcpi_snd_ssthresh: u32,
pub tcpi_snd_cwnd: u32,
pub tcpi_advmss: u32,
pub tcpi_reordering: u32,
pub tcpi_rcv_rtt: u32,
pub tcpi_rcv_space: u32,
pub tcpi_total_retrans: u32,

pub tcpi_pacing_rate: u64,
pub tcpi_max_pacing_rate: u64,
pub tcpi_bytes_acked: u64,
pub tcpi_bytes_received: u64,
pub tcpi_segs_out: u32,
pub tcpi_segs_in: u32,
pub tcpi_notsent_bytes: u32,
pub tcpi_min_rtt: u32,
pub tcpi_data_segs_in: u32,
pub tcpi_data_segs_out: u32,
pub tcpi_delivery_rate: u64,
pub tcpi_busy_time: u64,
pub tcpi_rwnd_limited: u64,
pub tcpi_sndbuf_limited: u64,
}
let mut info = tcp_info::default();
let mut len = core::mem::size_of_val(&info) as libc::socklen_t;
unsafe {
_ = libc::getsockopt(
fd,
libc::SOL_TCP,
libc::TCP_INFO,
&mut info as *mut tcp_info as *mut libc::c_void,
&mut len as *mut u32,
);
}

let info = TcpInfo {
bytes_acked: info.tcpi_bytes_acked,
bytes_received: info.tcpi_bytes_received,
};

return info;
}

#[cfg(not(target_os = "linux"))]
fn get_tcp_info(_fd: RawFd) -> TcpInfo {
TcpInfo::default()
}

0 comments on commit 8a873ca

Please # to comment.