From 0cf38d1b1dc5fccaa2393ada75a7cb6cf62c8fc6 Mon Sep 17 00:00:00 2001 From: danbugs Date: Wed, 23 Nov 2022 16:22:08 -0800 Subject: [PATCH] small improvements on the mosquitto implementation Signed-off-by: danbugs --- crates/pubsub/src/implementors/mosquitto.rs | 74 ++++++++++----------- crates/pubsub/src/lib.rs | 2 +- 2 files changed, 37 insertions(+), 39 deletions(-) diff --git a/crates/pubsub/src/implementors/mosquitto.rs b/crates/pubsub/src/implementors/mosquitto.rs index 1805b055..25d279b5 100644 --- a/crates/pubsub/src/implementors/mosquitto.rs +++ b/crates/pubsub/src/implementors/mosquitto.rs @@ -8,9 +8,7 @@ use tokio::{runtime::Handle, task::block_in_place}; #[derive(Clone)] pub struct MosquittoImplementor { - host: String, - port: i32, - subscriptions: Arc>>, + client: Arc>, } // TODO: We need to improve these Debug implementations @@ -31,30 +29,33 @@ impl MosquittoImplementor { .unwrap() .parse::() .unwrap(); - Self { - host, - port, - subscriptions: Arc::new(Mutex::new(Vec::new())), - } + + let client = block_in_place(|| { + Handle::current().block_on(async move { + let mut client = Client::with_auto_id().unwrap(); + + client + .connect(&host, port, std::time::Duration::from_secs(5), None) + .await + .unwrap(); + + Arc::new(Mutex::new(client)) + }) + }); + + Self { client } } } // Pub impl MosquittoImplementor { pub async fn publish(&self, msg_value: &[u8], topic: &str) -> Result<()> { - let mut mqtt = Client::with_auto_id().unwrap(); block_in_place(|| { Handle::current().block_on(async move { - mqtt.connect( - &self.host, - self.port, - std::time::Duration::from_secs(5), - None, - ) - .await - .unwrap(); - - mqtt.publish(topic, msg_value, QoS::AtMostOnce, false) + self.client + .lock() + .unwrap() + .publish(topic, msg_value, QoS::AtMostOnce, false) .await .unwrap() }) @@ -66,32 +67,29 @@ impl MosquittoImplementor { // Sub impl MosquittoImplementor { - pub fn subscribe(&self, topic: &str) -> Result<()> { - self.subscriptions.lock().unwrap().push(topic.to_string()); + pub async fn subscribe(&self, topic: &str) -> Result<()> { + block_in_place(|| { + Handle::current().block_on(async move { + self.client + .lock() + .unwrap() + .subscribe(topic, QoS::AtMostOnce) + .await + .unwrap(); + }) + }); Ok(()) } pub async fn receive(&self) -> Result> { - let mut mqtt = Client::with_auto_id().unwrap(); let mut res: Vec = vec![]; + block_in_place(|| { res = Handle::current().block_on(async move { - mqtt.connect( - &self.host, - self.port, - std::time::Duration::from_secs(5), - None, - ) - .await - .unwrap(); - - let subs_lock = self.subscriptions.lock().unwrap(); - - for t in subs_lock.iter() { - mqtt.subscribe(t, QoS::AtMostOnce).await.unwrap(); - } - - mqtt.subscriber() + self.client + .lock() + .unwrap() + .subscriber() .as_mut() .unwrap() .recv() diff --git a/crates/pubsub/src/lib.rs b/crates/pubsub/src/lib.rs index 8057540a..2cb15160 100644 --- a/crates/pubsub/src/lib.rs +++ b/crates/pubsub/src/lib.rs @@ -131,7 +131,7 @@ impl pubsub::Pubsub for Pubsub { async fn sub_subscribe(&mut self, self_: &Self::Sub, topic: &str) -> Result<(), Error> { match &self_.sub_implementor { SubImplementor::ConfluentApacheKafka(si) => si.subscribe(topic)?, - SubImplementor::Mosquitto(si) => si.subscribe(topic)?, + SubImplementor::Mosquitto(si) => si.subscribe(topic).await?, } Ok(())