-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathintegration_test.rs
79 lines (64 loc) · 2.48 KB
/
integration_test.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use minimq::{types::Utf8String, Minimq, Property, Publication, QoS, Will};
use embedded_nal::{self, IpAddr, Ipv4Addr};
use std_embedded_time::StandardClock;
#[test]
fn main() -> std::io::Result<()> {
env_logger::init();
let will = Will::new("exit", "Test complete".as_bytes(), &[]).unwrap();
let mut buffer = [0u8; 1024];
let stack = std_embedded_nal::Stack;
let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let mut mqtt: Minimq<'_, _, _, minimq::broker::IpBroker> = Minimq::new(
stack,
StandardClock::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer)
.will(will)
.unwrap()
.keepalive_interval(60),
);
let mut published = false;
let mut subscribed = false;
let mut responses = 0;
loop {
// Continually poll the client until there is no more data.
while let Some(was_response) = mqtt
.poll(|client, topic, payload, properties| {
log::info!("{} < {}", topic, core::str::from_utf8(payload).unwrap());
if let Ok(response) = Publication::respond(properties, b"Pong") {
client.publish(response).unwrap();
}
topic == "response"
})
.unwrap()
{
if was_response {
responses += 1;
if responses == 2 {
assert!(!mqtt.client().pending_messages());
std::process::exit(0);
}
}
}
let client = mqtt.client();
if !subscribed {
if client.is_connected() {
client
.subscribe(&["response".into(), "request".into()], &[])
.unwrap();
subscribed = true;
}
} else if !client.subscriptions_pending() && !published {
println!("PUBLISH request");
let properties = [Property::ResponseTopic(Utf8String("response"))];
let publication = Publication::new("request", b"Ping").properties(&properties);
client.publish(publication).unwrap();
let publication = Publication::new("request", b"Ping")
.properties(&properties)
.qos(QoS::AtLeastOnce);
client.publish(publication).unwrap();
// The message cannot be ack'd until the next poll call
assert!(client.pending_messages());
published = true;
}
}
}