msg: fix len for msg.Buffer and encode msg size as binary header

Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
Francis Bouvier
2024-11-28 00:38:33 +01:00
parent d95462073a
commit b800d0eeb8
4 changed files with 49 additions and 54 deletions

View File

@@ -19,6 +19,7 @@
const std = @import("std"); const std = @import("std");
const ws = @import("websocket"); const ws = @import("websocket");
const Msg = @import("msg.zig").Msg;
const log = std.log.scoped(.handler); const log = std.log.scoped(.handler);
@@ -45,7 +46,7 @@ pub const Stream = struct {
} }
fn closeCDP(self: *const Stream) void { fn closeCDP(self: *const Stream) void {
const close_msg: []const u8 = "5:close"; const close_msg: []const u8 = .{ 5, 0 } ++ "close";
self.recv(close_msg) catch |err| { self.recv(close_msg) catch |err| {
log.err("stream close error: {any}", .{err}); log.err("stream close error: {any}", .{err});
}; };
@@ -82,8 +83,10 @@ pub const Handler = struct {
self.stream.closeCDP(); self.stream.closeCDP();
} }
pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void { pub fn clientMessage(self: *Handler, data: []const u8) !void {
const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data }); var header: [2]u8 = undefined;
try self.stream.recv(msg); Msg.setSize(data.len, &header);
try self.stream.recv(&header);
try self.stream.recv(data);
} }
}; };

View File

@@ -25,6 +25,7 @@ const websocket = @import("websocket");
const Browser = @import("browser/browser.zig").Browser; const Browser = @import("browser/browser.zig").Browser;
const server = @import("server.zig"); const server = @import("server.zig");
const handler = @import("handler.zig"); const handler = @import("handler.zig");
const MaxSize = @import("msg.zig").MaxSize;
const parser = @import("netsurf"); const parser = @import("netsurf");
const apiweb = @import("apiweb.zig"); const apiweb = @import("apiweb.zig");
@@ -274,6 +275,8 @@ pub fn main() !void {
var ws = try websocket.Server(handler.Handler).init(alloc, .{ var ws = try websocket.Server(handler.Handler).init(alloc, .{
.port = opts.port, .port = opts.port,
.address = opts.host, .address = opts.host,
.max_message_size = MaxSize + 14, // overhead websocket
.max_conn = 1,
.handshake = .{ .handshake = .{
.timeout = 3, .timeout = 3,
.max_size = 1024, .max_size = 1024,

View File

@@ -18,6 +18,20 @@
const std = @import("std"); const std = @import("std");
pub const MsgSize = 16 * 1204; // 16KB
pub const HeaderSize = 2;
pub const MaxSize = HeaderSize + MsgSize;
pub const Msg = struct {
pub fn getSize(data: []const u8) usize {
return std.mem.readInt(u16, data[0..HeaderSize], .little);
}
pub fn setSize(len: usize, header: *[2]u8) void {
std.mem.writeInt(u16, header, @intCast(len), .little);
}
};
/// MsgBuffer returns messages from a raw text read stream, /// MsgBuffer returns messages from a raw text read stream,
/// according to the following format `<msg_size>:<msg>`. /// according to the following format `<msg_size>:<msg>`.
/// It handles both: /// It handles both:
@@ -26,21 +40,10 @@ const std = @import("std");
/// It's safe (and a good practice) to reuse the same MsgBuffer /// It's safe (and a good practice) to reuse the same MsgBuffer
/// on several reads of the same stream. /// on several reads of the same stream.
pub const MsgBuffer = struct { pub const MsgBuffer = struct {
size: usize = 0,
buf: []u8, buf: []u8,
size: usize = 0,
pos: usize = 0, pos: usize = 0,
const MaxSize = 1024 * 1024; // 1MB
pub fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer {
const buf = try alloc.alloc(u8, size);
return .{ .buf = buf };
}
pub fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void {
alloc.free(self.buf);
}
fn isFinished(self: *MsgBuffer) bool { fn isFinished(self: *MsgBuffer) bool {
return self.pos >= self.size; return self.pos >= self.size;
} }
@@ -55,7 +58,7 @@ pub const MsgBuffer = struct {
} }
// read input // read input
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct { pub fn read(self: *MsgBuffer, input: []const u8) !struct {
msg: []const u8, msg: []const u8,
left: []const u8, left: []const u8,
} { } {
@@ -64,11 +67,9 @@ pub const MsgBuffer = struct {
// msg size // msg size
var msg_size: usize = undefined; var msg_size: usize = undefined;
if (self.isEmpty()) { if (self.isEmpty()) {
// parse msg size metadata // decode msg size header
const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize; msg_size = Msg.getSize(_input);
const size_str = _input[0..size_pos]; _input = _input[HeaderSize..];
msg_size = try std.fmt.parseInt(u32, size_str, 10);
_input = _input[size_pos + 1 ..];
} else { } else {
msg_size = self.size; msg_size = self.size;
} }
@@ -90,17 +91,6 @@ pub const MsgBuffer = struct {
return error.MsgTooBig; return error.MsgTooBig;
} }
// check if the current input can fit in MsgBuffer
if (new_pos > self.buf.len) {
// we want to realloc at least:
// - a size big enough to fit the entire input (ie. new_pos)
// - a size big enough (ie. current msg size + starting buffer size)
// to avoid multiple reallocation
const new_size = @max(self.buf.len + self.size, new_pos);
// resize the MsgBuffer to fit
self.buf = try alloc.realloc(self.buf, new_size);
}
// copy the current input into MsgBuffer // copy the current input into MsgBuffer
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem) // NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/ // see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
@@ -123,47 +113,45 @@ pub const MsgBuffer = struct {
} }
}; };
fn doTest(nb: *u8) void {
nb.* += 1;
}
test "MsgBuffer" { test "MsgBuffer" {
const Case = struct { const Case = struct {
input: []const u8, input: []const u8,
nb: u8, nb: u8,
}; };
const alloc = std.testing.allocator;
const cases = [_]Case{ const cases = [_]Case{
// simple // simple
.{ .input = "2:ok", .nb = 1 }, .{ .input = .{ 2, 0 } ++ "ok", .nb = 1 },
// combined // combined
.{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here .{ .input = .{ 2, 0 } ++ "ok" ++ .{ 3, 0 } ++ "foo", .nb = 2 },
// multipart // multipart
.{ .input = "9:multi", .nb = 0 }, .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
.{ .input = "part", .nb = 1 }, .{ .input = "part", .nb = 1 },
// multipart & combined // multipart & combined
.{ .input = "9:multi", .nb = 0 }, .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
.{ .input = "part2:ok", .nb = 2 }, .{ .input = "part" ++ .{ 2, 0 } ++ "ok", .nb = 2 },
// multipart & combined with other multipart // multipart & combined with other multipart
.{ .input = "9:multi", .nb = 0 }, .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 },
.{ .input = "part8:co", .nb = 1 }, .{ .input = "part" ++ .{ 8, 0 } ++ "co", .nb = 1 },
.{ .input = "mbined", .nb = 1 }, .{ .input = "mbined", .nb = 1 },
// several multipart // several multipart
.{ .input = "23:multi", .nb = 0 }, .{ .input = .{ 23, 0 } ++ "multi", .nb = 0 },
.{ .input = "several", .nb = 0 }, .{ .input = "several", .nb = 0 },
.{ .input = "complex", .nb = 0 }, .{ .input = "complex", .nb = 0 },
.{ .input = "part", .nb = 1 }, .{ .input = "part", .nb = 1 },
// combined & multipart // combined & multipart
.{ .input = "2:ok9:multi", .nb = 1 }, .{ .input = .{ 2, 0 } ++ "ok" ++ .{ 9, 0 } ++ "multi", .nb = 1 },
.{ .input = "part", .nb = 1 }, .{ .input = "part", .nb = 1 },
}; };
var msg_buf = try MsgBuffer.init(alloc, 10);
defer msg_buf.deinit(alloc); var buf: [MaxSize]u8 = undefined;
var msg_buf = MsgBuffer{ .buf = &buf };
for (cases) |case| { for (cases) |case| {
var nb: u8 = 0; var nb: u8 = 0;
var input: []const u8 = case.input; var input = case.input;
while (input.len > 0) { while (input.len > 0) {
const parts = msg_buf.read(alloc, input) catch |err| { const parts = msg_buf.read(input) catch |err| {
if (err == error.MsgMultipart) break; // go to the next case input if (err == error.MsgMultipart) break; // go to the next case input
return err; return err;
}; };

View File

@@ -31,6 +31,7 @@ const CancelError = jsruntime.IO.CancelError;
const TimeoutError = jsruntime.IO.TimeoutError; const TimeoutError = jsruntime.IO.TimeoutError;
const MsgBuffer = @import("msg.zig").MsgBuffer; const MsgBuffer = @import("msg.zig").MsgBuffer;
const MaxSize = @import("msg.zig").MaxSize;
const Browser = @import("browser/browser.zig").Browser; const Browser = @import("browser/browser.zig").Browser;
const cdp = @import("cdp/cdp.zig"); const cdp = @import("cdp/cdp.zig");
@@ -161,7 +162,7 @@ pub const Ctx = struct {
// read and execute input // read and execute input
var input: []const u8 = self.read_buf[0..size]; var input: []const u8 = self.read_buf[0..size];
while (input.len > 0) { while (input.len > 0) {
const parts = self.msg_buf.read(self.alloc(), input) catch |err| { const parts = self.msg_buf.read(input) catch |err| {
if (err == error.MsgMultipart) { if (err == error.MsgMultipart) {
return; return;
} else { } else {
@@ -434,8 +435,8 @@ pub fn handle(
// create buffers // create buffers
var read_buf: [BufReadSize]u8 = undefined; var read_buf: [BufReadSize]u8 = undefined;
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB var buf: [MaxSize]u8 = undefined;
defer msg_buf.deinit(loop.alloc); var msg_buf = MsgBuffer{ .buf = &buf };
// create I/O completions // create I/O completions
var accept_completion: Completion = undefined; var accept_completion: Completion = undefined;