-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexamples.zig
76 lines (59 loc) · 2.35 KB
/
examples.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
const std = @import("std");
const zenoh = @import("zenoh");
fn publish() !void {
var config = try zenoh.Config.initDefault();
defer config.deinit();
var session = try zenoh.Session.open(
&config,
&zenoh.Session.OpenOptions.init(),
);
defer session.deinit();
var bytes = try zenoh.Bytes.initFromStaticString("hello world");
defer bytes.deinit();
var options = zenoh.Session.PutOptions.init();
try session.put("key/expression", &bytes, &options);
}
var got_message: bool = false;
fn data_handler(sample: [*c]zenoh.c.z_loaned_sample_t, arg: ?*anyopaque) callconv(.c) void {
_ = arg;
const payload = zenoh.c.z_sample_payload(sample);
var string: zenoh.c.z_owned_string_t = undefined;
_ = zenoh.c.z_bytes_to_string(payload, &string);
var slice: []const u8 = undefined;
slice.ptr = zenoh.c.z_string_data(zenoh.loan(&string));
slice.len = zenoh.c.z_string_len(zenoh.loan(&string));
std.log.info("Got sample: {s}", .{slice});
got_message = true;
}
fn subscribe() !void {
var config = try zenoh.Config.initDefault();
defer config.deinit();
var session = try zenoh.Session.open(&config, &zenoh.Session.OpenOptions.init());
defer session.deinit();
var callback: zenoh.c.z_owned_closure_sample_t = undefined;
zenoh.c.z_closure_sample(&callback, &data_handler, null, null);
var closure = zenoh.ClosureSample.init(&data_handler, null, null);
defer closure.deinit();
var key_expr = try zenoh.KeyExpr.initFromStr("key/expression");
defer key_expr.deinit();
var subscriber_options = zenoh.Session.SubscriberOptions.init();
var subscriber = try session.declareSubscriber(&key_expr, &closure, &subscriber_options);
defer subscriber.deinit();
var timer = std.time.Timer.start() catch @panic("timer unsupported");
while (timer.read() <= std.time.ns_per_s * 10) {
if (got_message) {
break;
}
std.Thread.sleep(std.time.ns_per_s * 1);
} else {
return error.NoMessage;
}
}
test "pubsub between two threads" {
const sub_thread = try std.Thread.spawn(.{ .allocator = null }, subscribe, .{});
std.Thread.sleep(std.time.ns_per_s * 1);
const pub_thread = try std.Thread.spawn(.{ .allocator = null }, publish, .{});
sub_thread.join();
pub_thread.join();
try std.testing.expect(got_message);
}