From fd4f2f1be09481fc870c3ba911bf5c23e4a18707 Mon Sep 17 00:00:00 2001 From: eva Date: Sat, 20 Jul 2024 16:07:52 +0300 Subject: [PATCH] change futures channel to tokio channel (#28) --- Cargo.toml | 4 ++-- src/connection/mod.rs | 4 ++-- src/lib.rs | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5e4d0f5..76323eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,14 @@ rustls-tls = ["tokio-rustls", "rustls-pki-types"] [dependencies] bytes = "1.5.0" -futures-channel = "0.3.30" futures-util = { version = "0.3.30", features = ["sink"] } hostname = "0.3.1" rustls-pki-types = { version = "1.1.0", optional = true } serde_json = "1.0.108" thiserror = "1.0.52" -tokio = { version = "1.35.1", features = ["io-util", "net", "time"] } +tokio = { version = "1.35.1", features = ["io-util", "net", "time", "sync"] } tokio-rustls = { version = "0.25.0", optional = true } +tokio-stream = "0.1.15" tokio-util = { version = "0.7.10", features = ["codec", "net"] } tracing-core = "0.1.32" tracing-futures = "0.2.5" diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 0d98c4d..53eeb79 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -4,8 +4,8 @@ mod udp; use std::{io, net::SocketAddr}; use bytes::Bytes; -use futures_channel::mpsc; use tokio::net::{lookup_host, ToSocketAddrs}; +use tokio_stream::wrappers::ReceiverStream; use tracing_core::subscriber::NoSubscriber; use tracing_futures::WithSubscriber; @@ -26,7 +26,7 @@ pub struct ConnectionErrors(pub Vec<(SocketAddr, io::Error)>); #[must_use] pub struct ConnectionHandle { pub(crate) addr: A, - pub(crate) receiver: mpsc::Receiver, + pub(crate) receiver: ReceiverStream, pub(crate) conn: Conn, } diff --git a/src/lib.rs b/src/lib.rs index e1a9ce5..3693c7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,9 +59,10 @@ mod visitor; use std::{borrow::Cow, collections::HashMap, fmt::Display}; use bytes::Bytes; -use futures_channel::mpsc; use serde_json::{map::Map, Value}; use tokio::net::ToSocketAddrs; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tracing_core::{ dispatcher::SetGlobalDefaultError, span::{Attributes, Id, Record}, @@ -248,7 +249,7 @@ impl Builder { let (sender, receiver) = mpsc::channel::(buffer); let handle = ConnectionHandle { addr, - receiver, + receiver: ReceiverStream::new(receiver), conn, }; let logger = Logger {