Skip to content

Commit b846541

Browse files
Merge pull request #311 from lightpanda-io/msg_size_encode
Msg size encode
2 parents 3088c7a + 8f297b8 commit b846541

File tree

4 files changed

+60
-65
lines changed

4 files changed

+60
-65
lines changed

src/handler.zig

+7-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
const std = @import("std");
2020

2121
const ws = @import("websocket");
22+
const Msg = @import("msg.zig").Msg;
2223

2324
const log = std.log.scoped(.handler);
2425

@@ -45,7 +46,7 @@ pub const Stream = struct {
4546
}
4647

4748
fn closeCDP(self: *const Stream) void {
48-
const close_msg: []const u8 = "5:close";
49+
const close_msg: []const u8 = .{ 5, 0 } ++ "close";
4950
self.recv(close_msg) catch |err| {
5051
log.err("stream close error: {any}", .{err});
5152
};
@@ -82,8 +83,10 @@ pub const Handler = struct {
8283
self.stream.closeCDP();
8384
}
8485

85-
pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void {
86-
const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data });
87-
try self.stream.recv(msg);
86+
pub fn clientMessage(self: *Handler, data: []const u8) !void {
87+
var header: [2]u8 = undefined;
88+
Msg.setSize(data.len, &header);
89+
try self.stream.recv(&header);
90+
try self.stream.recv(data);
8891
}
8992
};

src/main.zig

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const websocket = @import("websocket");
2525
const Browser = @import("browser/browser.zig").Browser;
2626
const server = @import("server.zig");
2727
const handler = @import("handler.zig");
28+
const MaxSize = @import("msg.zig").MaxSize;
2829

2930
const parser = @import("netsurf");
3031
const apiweb = @import("apiweb.zig");
@@ -274,6 +275,8 @@ pub fn main() !void {
274275
var ws = try websocket.Server(handler.Handler).init(alloc, .{
275276
.port = opts.port,
276277
.address = opts.host,
278+
.max_message_size = MaxSize + 14, // overhead websocket
279+
.max_conn = 1,
277280
.handshake = .{
278281
.timeout = 3,
279282
.max_size = 1024,

src/msg.zig

+45-57
Original file line numberDiff line numberDiff line change
@@ -18,44 +18,47 @@
1818

1919
const std = @import("std");
2020

21-
/// MsgBuffer returns messages from a raw text read stream,
22-
/// according to the following format `<msg_size>:<msg>`.
21+
pub const MsgSize = 16 * 1204; // 16KB
22+
pub const HeaderSize = 2;
23+
pub const MaxSize = HeaderSize + MsgSize;
24+
25+
pub const Msg = struct {
26+
pub fn getSize(data: []const u8) usize {
27+
return std.mem.readInt(u16, data[0..HeaderSize], .little);
28+
}
29+
30+
pub fn setSize(len: usize, header: *[2]u8) void {
31+
std.mem.writeInt(u16, header, @intCast(len), .little);
32+
}
33+
};
34+
35+
/// Buffer returns messages from a raw text read stream,
36+
/// with the message size being encoded on the 2 first bytes (little endian)
2337
/// It handles both:
2438
/// - combined messages in one read
2539
/// - single message in several reads (multipart)
26-
/// It's safe (and a good practice) to reuse the same MsgBuffer
40+
/// It's safe (and a good practice) to reuse the same Buffer
2741
/// on several reads of the same stream.
28-
pub const MsgBuffer = struct {
29-
size: usize = 0,
42+
pub const Buffer = struct {
3043
buf: []u8,
44+
size: usize = 0,
3145
pos: usize = 0,
3246

33-
const MaxSize = 1024 * 1024; // 1MB
34-
35-
pub fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer {
36-
const buf = try alloc.alloc(u8, size);
37-
return .{ .buf = buf };
38-
}
39-
40-
pub fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void {
41-
alloc.free(self.buf);
42-
}
43-
44-
fn isFinished(self: *MsgBuffer) bool {
47+
fn isFinished(self: *const Buffer) bool {
4548
return self.pos >= self.size;
4649
}
4750

48-
fn isEmpty(self: MsgBuffer) bool {
51+
fn isEmpty(self: *const Buffer) bool {
4952
return self.size == 0 and self.pos == 0;
5053
}
5154

52-
fn reset(self: *MsgBuffer) void {
55+
fn reset(self: *Buffer) void {
5356
self.size = 0;
5457
self.pos = 0;
5558
}
5659

5760
// read input
58-
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct {
61+
pub fn read(self: *Buffer, input: []const u8) !struct {
5962
msg: []const u8,
6063
left: []const u8,
6164
} {
@@ -64,11 +67,9 @@ pub const MsgBuffer = struct {
6467
// msg size
6568
var msg_size: usize = undefined;
6669
if (self.isEmpty()) {
67-
// parse msg size metadata
68-
const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize;
69-
const size_str = _input[0..size_pos];
70-
msg_size = try std.fmt.parseInt(u32, size_str, 10);
71-
_input = _input[size_pos + 1 ..];
70+
// decode msg size header
71+
msg_size = Msg.getSize(_input);
72+
_input = _input[HeaderSize..];
7273
} else {
7374
msg_size = self.size;
7475
}
@@ -77,7 +78,7 @@ pub const MsgBuffer = struct {
7778
const is_multipart = !self.isEmpty() or _input.len < msg_size;
7879
if (is_multipart) {
7980

80-
// set msg size on empty MsgBuffer
81+
// set msg size on empty Buffer
8182
if (self.isEmpty()) {
8283
self.size = msg_size;
8384
}
@@ -90,18 +91,7 @@ pub const MsgBuffer = struct {
9091
return error.MsgTooBig;
9192
}
9293

93-
// check if the current input can fit in MsgBuffer
94-
if (new_pos > self.buf.len) {
95-
// we want to realloc at least:
96-
// - a size big enough to fit the entire input (ie. new_pos)
97-
// - a size big enough (ie. current msg size + starting buffer size)
98-
// to avoid multiple reallocation
99-
const new_size = @max(self.buf.len + self.size, new_pos);
100-
// resize the MsgBuffer to fit
101-
self.buf = try alloc.realloc(self.buf, new_size);
102-
}
103-
104-
// copy the current input into MsgBuffer
94+
// copy the current input into Buffer
10595
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
10696
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
10797
// Intead we just use std.mem.copyForwards
@@ -123,47 +113,45 @@ pub const MsgBuffer = struct {
123113
}
124114
};
125115

126-
fn doTest(nb: *u8) void {
127-
nb.* += 1;
128-
}
129-
130-
test "MsgBuffer" {
116+
test "Buffer" {
131117
const Case = struct {
132118
input: []const u8,
133119
nb: u8,
134120
};
135-
const alloc = std.testing.allocator;
121+
136122
const cases = [_]Case{
137123
// simple
138-
.{ .input = "2:ok", .nb = 1 },
124+
.{ .input = .{ 2, 0 } ++ "ok", .nb = 1 },
139125
// combined
140-
.{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here
126+
.{ .input = .{ 2, 0 } ++ "ok" ++ .{ 3, 0 } ++ "foo", .nb = 2 },
141127
// multipart
142-
.{ .input = "9:multi", .nb = 0 },
128+
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
143129
.{ .input = "part", .nb = 1 },
144130
// multipart & combined
145-
.{ .input = "9:multi", .nb = 0 },
146-
.{ .input = "part2:ok", .nb = 2 },
131+
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
132+
.{ .input = "part" ++ .{ 2, 0 } ++ "ok", .nb = 2 },
147133
// multipart & combined with other multipart
148-
.{ .input = "9:multi", .nb = 0 },
149-
.{ .input = "part8:co", .nb = 1 },
134+
.{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
135+
.{ .input = "part" ++ .{ 8, 0 } ++ "co", .nb = 1 },
150136
.{ .input = "mbined", .nb = 1 },
151137
// several multipart
152-
.{ .input = "23:multi", .nb = 0 },
138+
.{ .input = .{ 23, 0 } ++ "multi", .nb = 0 },
153139
.{ .input = "several", .nb = 0 },
154140
.{ .input = "complex", .nb = 0 },
155141
.{ .input = "part", .nb = 1 },
156142
// combined & multipart
157-
.{ .input = "2:ok9:multi", .nb = 1 },
143+
.{ .input = .{ 2, 0 } ++ "ok" ++ .{ 9, 0 } ++ "multi", .nb = 1 },
158144
.{ .input = "part", .nb = 1 },
159145
};
160-
var msg_buf = try MsgBuffer.init(alloc, 10);
161-
defer msg_buf.deinit(alloc);
146+
147+
var b: [MaxSize]u8 = undefined;
148+
var buf = Buffer{ .buf = &b };
149+
162150
for (cases) |case| {
163151
var nb: u8 = 0;
164-
var input: []const u8 = case.input;
152+
var input = case.input;
165153
while (input.len > 0) {
166-
const parts = msg_buf.read(alloc, input) catch |err| {
154+
const parts = buf.read(input) catch |err| {
167155
if (err == error.MsgMultipart) break; // go to the next case input
168156
return err;
169157
};

src/server.zig

+5-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ const CloseError = jsruntime.IO.CloseError;
3030
const CancelError = jsruntime.IO.CancelError;
3131
const TimeoutError = jsruntime.IO.TimeoutError;
3232

33-
const MsgBuffer = @import("msg.zig").MsgBuffer;
33+
const MsgBuffer = @import("msg.zig").Buffer;
34+
const MaxSize = @import("msg.zig").MaxSize;
3435
const Browser = @import("browser/browser.zig").Browser;
3536
const cdp = @import("cdp/cdp.zig");
3637

@@ -161,7 +162,7 @@ pub const Ctx = struct {
161162
// read and execute input
162163
var input: []const u8 = self.read_buf[0..size];
163164
while (input.len > 0) {
164-
const parts = self.msg_buf.read(self.alloc(), input) catch |err| {
165+
const parts = self.msg_buf.read(input) catch |err| {
165166
if (err == error.MsgMultipart) {
166167
return;
167168
} else {
@@ -434,8 +435,8 @@ pub fn handle(
434435

435436
// create buffers
436437
var read_buf: [BufReadSize]u8 = undefined;
437-
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB
438-
defer msg_buf.deinit(loop.alloc);
438+
var buf: [MaxSize]u8 = undefined;
439+
var msg_buf = MsgBuffer{ .buf = &buf };
439440

440441
// create I/O completions
441442
var accept_completion: Completion = undefined;

0 commit comments

Comments
 (0)