diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index f6ffc5de0..1678caff9 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -225,10 +225,11 @@ impl MqttState { .ok_or(StateError::Unsolicited(puback.pkid))?; self.last_puback = puback.pkid; - publish.take().ok_or({ + + if publish.take().is_none() { error!("Unsolicited puback packet: {:?}", puback.pkid); - StateError::Unsolicited(puback.pkid) - })?; + return Err(StateError::Unsolicited(puback.pkid)); + } self.inflight -= 1; let packet = self.check_collision(puback.pkid).map(|publish| { @@ -250,21 +251,19 @@ impl MqttState { .outgoing_pub .get_mut(pubrec.pkid as usize) .ok_or(StateError::Unsolicited(pubrec.pkid))?; - match publish.take() { - Some(_) => { - // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); - let pubrel = PubRel { pkid: pubrec.pkid }; - let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); - self.events.push_back(event); - Ok(Some(Packet::PubRel(pubrel))) - } - None => { - error!("Unsolicited pubrec packet: {:?}", pubrec.pkid); - Err(StateError::Unsolicited(pubrec.pkid)) - } + if publish.take().is_none() { + error!("Unsolicited pubrec packet: {:?}", pubrec.pkid); + return Err(StateError::Unsolicited(pubrec.pkid)); } + + // NOTE: Inflight - 1 for qos2 in comp + self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); + let pubrel = PubRel { pkid: pubrec.pkid }; + let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); + self.events.push_back(event); + + Ok(Some(Packet::PubRel(pubrel))) } fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result, StateError> { @@ -272,30 +271,30 @@ impl MqttState { .incoming_pub .get_mut(pubrel.pkid as usize) .ok_or(StateError::Unsolicited(pubrel.pkid))?; - match publish.take() { - Some(_) => { - let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); - let pubcomp = PubComp { pkid: pubrel.pkid }; - self.events.push_back(event); - Ok(Some(Packet::PubComp(pubcomp))) - } - None => { - error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); - Err(StateError::Unsolicited(pubrel.pkid)) - } + if publish.take().is_none() { + error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); + return Err(StateError::Unsolicited(pubrel.pkid)); } + + let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); + let pubcomp = PubComp { pkid: pubrel.pkid }; + self.events.push_back(event); + + Ok(Some(Packet::PubComp(pubcomp))) } fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result, StateError> { - self.outgoing_rel + if self + .outgoing_rel .get_mut(pubcomp.pkid as usize) .ok_or(StateError::Unsolicited(pubcomp.pkid))? .take() - .ok_or({ - error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); - StateError::Unsolicited(pubcomp.pkid) - })?; + .is_none() + { + error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); + return Err(StateError::Unsolicited(pubcomp.pkid)); + } self.inflight -= 1; let packet = self.check_collision(pubcomp.pkid).map(|publish| { diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 456272b4d..95c9c8967 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -362,17 +362,13 @@ impl MqttState { .outgoing_pub .get_mut(puback.pkid as usize) .ok_or(StateError::Unsolicited(puback.pkid))?; - let v = match publish.take() { - Some(_) => { - self.inflight -= 1; - Ok(None) - } - None => { - error!("Unsolicited puback packet: {:?}", puback.pkid); - Err(StateError::Unsolicited(puback.pkid)) - } - }; + if publish.take().is_none() { + error!("Unsolicited puback packet: {:?}", puback.pkid); + return Err(StateError::Unsolicited(puback.pkid)); + } + + self.inflight -= 1; if puback.reason != PubAckReason::Success && puback.reason != PubAckReason::NoMatchingSubscribers @@ -394,7 +390,7 @@ impl MqttState { return Ok(Some(Packet::Publish(publish))); } - v + Ok(None) } fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result, StateError> { @@ -402,28 +398,26 @@ impl MqttState { .outgoing_pub .get_mut(pubrec.pkid as usize) .ok_or(StateError::Unsolicited(pubrec.pkid))?; - match publish.take() { - Some(_) => { - if pubrec.reason != PubRecReason::Success - && pubrec.reason != PubRecReason::NoMatchingSubscribers - { - return Err(StateError::PubRecFail { - reason: pubrec.reason, - }); - } - // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); - let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); - self.events.push_back(event); + if publish.take().is_none() { + error!("Unsolicited pubrec packet: {:?}", pubrec.pkid); + return Err(StateError::Unsolicited(pubrec.pkid)); + } - Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None)))) - } - None => { - error!("Unsolicited pubrec packet: {:?}", pubrec.pkid); - Err(StateError::Unsolicited(pubrec.pkid)) - } + if pubrec.reason != PubRecReason::Success + && pubrec.reason != PubRecReason::NoMatchingSubscribers + { + return Err(StateError::PubRecFail { + reason: pubrec.reason, + }); } + + // NOTE: Inflight - 1 for qos2 in comp + self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); + let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); + self.events.push_back(event); + + Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None)))) } fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result, StateError> { @@ -431,24 +425,22 @@ impl MqttState { .incoming_pub .get_mut(pubrel.pkid as usize) .ok_or(StateError::Unsolicited(pubrel.pkid))?; - match publish.take() { - Some(_) => { - if pubrel.reason != PubRelReason::Success { - return Err(StateError::PubRelFail { - reason: pubrel.reason, - }); - } - let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); - self.events.push_back(event); + if publish.take().is_none() { + error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); + return Err(StateError::Unsolicited(pubrel.pkid)); + } - Ok(Some(Packet::PubComp(PubComp::new(pubrel.pkid, None)))) - } - None => { - error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); - Err(StateError::Unsolicited(pubrel.pkid)) - } + if pubrel.reason != PubRelReason::Success { + return Err(StateError::PubRelFail { + reason: pubrel.reason, + }); } + + let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid)); + self.events.push_back(event); + + Ok(Some(Packet::PubComp(PubComp::new(pubrel.pkid, None)))) } fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result, StateError> { @@ -465,22 +457,19 @@ impl MqttState { .outgoing_rel .get_mut(pubcomp.pkid as usize) .ok_or(StateError::Unsolicited(pubcomp.pkid))?; - match pubrel.take() { - Some(_) => { - if pubcomp.reason != PubCompReason::Success { - return Err(StateError::PubCompFail { - reason: pubcomp.reason, - }); - } + if pubrel.take().is_none() { + error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); + return Err(StateError::Unsolicited(pubcomp.pkid)); + } - self.inflight -= 1; - Ok(outgoing) - } - None => { - error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); - Err(StateError::Unsolicited(pubcomp.pkid)) - } + if pubcomp.reason != PubCompReason::Success { + return Err(StateError::PubCompFail { + reason: pubcomp.reason, + }); } + + self.inflight -= 1; + Ok(outgoing) } fn handle_incoming_pingresp(&mut self) -> Result, StateError> {