From b800d0eeb839e3c3b1af42de24a600cf18c3d451 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Thu, 28 Nov 2024 00:38:33 +0100 Subject: [PATCH 1/2] msg: fix len for msg.Buffer and encode msg size as binary header Signed-off-by: Francis Bouvier --- src/handler.zig | 11 ++++--- src/main.zig | 3 ++ src/msg.zig | 82 +++++++++++++++++++++---------------------------- src/server.zig | 7 +++-- 4 files changed, 49 insertions(+), 54 deletions(-) diff --git a/src/handler.zig b/src/handler.zig index 4a9ba06a..d155cf35 100644 --- a/src/handler.zig +++ b/src/handler.zig @@ -19,6 +19,7 @@ const std = @import("std"); const ws = @import("websocket"); +const Msg = @import("msg.zig").Msg; const log = std.log.scoped(.handler); @@ -45,7 +46,7 @@ pub const Stream = struct { } fn closeCDP(self: *const Stream) void { - const close_msg: []const u8 = "5:close"; + const close_msg: []const u8 = .{ 5, 0 } ++ "close"; self.recv(close_msg) catch |err| { log.err("stream close error: {any}", .{err}); }; @@ -82,8 +83,10 @@ pub const Handler = struct { self.stream.closeCDP(); } - pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void { - const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data }); - try self.stream.recv(msg); + pub fn clientMessage(self: *Handler, data: []const u8) !void { + var header: [2]u8 = undefined; + Msg.setSize(data.len, &header); + try self.stream.recv(&header); + try self.stream.recv(data); } }; diff --git a/src/main.zig b/src/main.zig index 82d2374f..e3cda739 100644 --- a/src/main.zig +++ b/src/main.zig @@ -25,6 +25,7 @@ const websocket = @import("websocket"); const Browser = @import("browser/browser.zig").Browser; const server = @import("server.zig"); const handler = @import("handler.zig"); +const MaxSize = @import("msg.zig").MaxSize; const parser = @import("netsurf"); const apiweb = @import("apiweb.zig"); @@ -274,6 +275,8 @@ pub fn main() !void { var ws = try websocket.Server(handler.Handler).init(alloc, .{ .port = opts.port, .address = opts.host, + .max_message_size = MaxSize + 14, // overhead websocket + .max_conn = 1, .handshake = .{ .timeout = 3, .max_size = 1024, diff --git a/src/msg.zig b/src/msg.zig index 64a81bd9..7ff5e5ec 100644 --- a/src/msg.zig +++ b/src/msg.zig @@ -18,6 +18,20 @@ const std = @import("std"); +pub const MsgSize = 16 * 1204; // 16KB +pub const HeaderSize = 2; +pub const MaxSize = HeaderSize + MsgSize; + +pub const Msg = struct { + pub fn getSize(data: []const u8) usize { + return std.mem.readInt(u16, data[0..HeaderSize], .little); + } + + pub fn setSize(len: usize, header: *[2]u8) void { + std.mem.writeInt(u16, header, @intCast(len), .little); + } +}; + /// MsgBuffer returns messages from a raw text read stream, /// according to the following format `:`. /// It handles both: @@ -26,21 +40,10 @@ const std = @import("std"); /// It's safe (and a good practice) to reuse the same MsgBuffer /// on several reads of the same stream. pub const MsgBuffer = struct { - size: usize = 0, buf: []u8, + size: usize = 0, pos: usize = 0, - const MaxSize = 1024 * 1024; // 1MB - - pub fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer { - const buf = try alloc.alloc(u8, size); - return .{ .buf = buf }; - } - - pub fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void { - alloc.free(self.buf); - } - fn isFinished(self: *MsgBuffer) bool { return self.pos >= self.size; } @@ -55,7 +58,7 @@ pub const MsgBuffer = struct { } // read input - pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct { + pub fn read(self: *MsgBuffer, input: []const u8) !struct { msg: []const u8, left: []const u8, } { @@ -64,11 +67,9 @@ pub const MsgBuffer = struct { // msg size var msg_size: usize = undefined; if (self.isEmpty()) { - // parse msg size metadata - const size_pos = std.mem.indexOfScalar(u8, _input, ':') orelse return error.InputWithoutSize; - const size_str = _input[0..size_pos]; - msg_size = try std.fmt.parseInt(u32, size_str, 10); - _input = _input[size_pos + 1 ..]; + // decode msg size header + msg_size = Msg.getSize(_input); + _input = _input[HeaderSize..]; } else { msg_size = self.size; } @@ -90,17 +91,6 @@ pub const MsgBuffer = struct { return error.MsgTooBig; } - // check if the current input can fit in MsgBuffer - if (new_pos > self.buf.len) { - // we want to realloc at least: - // - a size big enough to fit the entire input (ie. new_pos) - // - a size big enough (ie. current msg size + starting buffer size) - // to avoid multiple reallocation - const new_size = @max(self.buf.len + self.size, new_pos); - // resize the MsgBuffer to fit - self.buf = try alloc.realloc(self.buf, new_size); - } - // copy the current input into MsgBuffer // NOTE: we could use @memcpy but it's not Thread-safe (alias problem) // see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/ @@ -123,47 +113,45 @@ pub const MsgBuffer = struct { } }; -fn doTest(nb: *u8) void { - nb.* += 1; -} - test "MsgBuffer" { const Case = struct { input: []const u8, nb: u8, }; - const alloc = std.testing.allocator; + const cases = [_]Case{ // simple - .{ .input = "2:ok", .nb = 1 }, + .{ .input = .{ 2, 0 } ++ "ok", .nb = 1 }, // combined - .{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here + .{ .input = .{ 2, 0 } ++ "ok" ++ .{ 3, 0 } ++ "foo", .nb = 2 }, // multipart - .{ .input = "9:multi", .nb = 0 }, + .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 }, .{ .input = "part", .nb = 1 }, // multipart & combined - .{ .input = "9:multi", .nb = 0 }, - .{ .input = "part2:ok", .nb = 2 }, + .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 }, + .{ .input = "part" ++ .{ 2, 0 } ++ "ok", .nb = 2 }, // multipart & combined with other multipart - .{ .input = "9:multi", .nb = 0 }, - .{ .input = "part8:co", .nb = 1 }, + .{ .input = .{ 9, 0 } ++ "multi", .nb = 0 }, + .{ .input = "part" ++ .{ 8, 0 } ++ "co", .nb = 1 }, .{ .input = "mbined", .nb = 1 }, // several multipart - .{ .input = "23:multi", .nb = 0 }, + .{ .input = .{ 23, 0 } ++ "multi", .nb = 0 }, .{ .input = "several", .nb = 0 }, .{ .input = "complex", .nb = 0 }, .{ .input = "part", .nb = 1 }, // combined & multipart - .{ .input = "2:ok9:multi", .nb = 1 }, + .{ .input = .{ 2, 0 } ++ "ok" ++ .{ 9, 0 } ++ "multi", .nb = 1 }, .{ .input = "part", .nb = 1 }, }; - var msg_buf = try MsgBuffer.init(alloc, 10); - defer msg_buf.deinit(alloc); + + var buf: [MaxSize]u8 = undefined; + var msg_buf = MsgBuffer{ .buf = &buf }; + for (cases) |case| { var nb: u8 = 0; - var input: []const u8 = case.input; + var input = case.input; while (input.len > 0) { - const parts = msg_buf.read(alloc, input) catch |err| { + const parts = msg_buf.read(input) catch |err| { if (err == error.MsgMultipart) break; // go to the next case input return err; }; diff --git a/src/server.zig b/src/server.zig index 7d2a41a5..102a14c3 100644 --- a/src/server.zig +++ b/src/server.zig @@ -31,6 +31,7 @@ const CancelError = jsruntime.IO.CancelError; const TimeoutError = jsruntime.IO.TimeoutError; const MsgBuffer = @import("msg.zig").MsgBuffer; +const MaxSize = @import("msg.zig").MaxSize; const Browser = @import("browser/browser.zig").Browser; const cdp = @import("cdp/cdp.zig"); @@ -161,7 +162,7 @@ pub const Ctx = struct { // read and execute input var input: []const u8 = self.read_buf[0..size]; while (input.len > 0) { - const parts = self.msg_buf.read(self.alloc(), input) catch |err| { + const parts = self.msg_buf.read(input) catch |err| { if (err == error.MsgMultipart) { return; } else { @@ -434,8 +435,8 @@ pub fn handle( // create buffers var read_buf: [BufReadSize]u8 = undefined; - var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB - defer msg_buf.deinit(loop.alloc); + var buf: [MaxSize]u8 = undefined; + var msg_buf = MsgBuffer{ .buf = &buf }; // create I/O completions var accept_completion: Completion = undefined; From 8f297b83c14ce3b22cf2f145d8d0d26bb0f062ba Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Thu, 28 Nov 2024 00:50:44 +0100 Subject: [PATCH 2/2] msg: rename MsgBuffer -> Buffer Signed-off-by: Francis Bouvier --- src/msg.zig | 28 ++++++++++++++-------------- src/server.zig | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/msg.zig b/src/msg.zig index 7ff5e5ec..42d318dd 100644 --- a/src/msg.zig +++ b/src/msg.zig @@ -32,33 +32,33 @@ pub const Msg = struct { } }; -/// MsgBuffer returns messages from a raw text read stream, -/// according to the following format `:`. +/// Buffer returns messages from a raw text read stream, +/// with the message size being encoded on the 2 first bytes (little endian) /// It handles both: /// - combined messages in one read /// - single message in several reads (multipart) -/// It's safe (and a good practice) to reuse the same MsgBuffer +/// It's safe (and a good practice) to reuse the same Buffer /// on several reads of the same stream. -pub const MsgBuffer = struct { +pub const Buffer = struct { buf: []u8, size: usize = 0, pos: usize = 0, - fn isFinished(self: *MsgBuffer) bool { + fn isFinished(self: *const Buffer) bool { return self.pos >= self.size; } - fn isEmpty(self: MsgBuffer) bool { + fn isEmpty(self: *const Buffer) bool { return self.size == 0 and self.pos == 0; } - fn reset(self: *MsgBuffer) void { + fn reset(self: *Buffer) void { self.size = 0; self.pos = 0; } // read input - pub fn read(self: *MsgBuffer, input: []const u8) !struct { + pub fn read(self: *Buffer, input: []const u8) !struct { msg: []const u8, left: []const u8, } { @@ -78,7 +78,7 @@ pub const MsgBuffer = struct { const is_multipart = !self.isEmpty() or _input.len < msg_size; if (is_multipart) { - // set msg size on empty MsgBuffer + // set msg size on empty Buffer if (self.isEmpty()) { self.size = msg_size; } @@ -91,7 +91,7 @@ pub const MsgBuffer = struct { return error.MsgTooBig; } - // copy the current input into MsgBuffer + // copy the current input into Buffer // NOTE: we could use @memcpy but it's not Thread-safe (alias problem) // see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/ // Intead we just use std.mem.copyForwards @@ -113,7 +113,7 @@ pub const MsgBuffer = struct { } }; -test "MsgBuffer" { +test "Buffer" { const Case = struct { input: []const u8, nb: u8, @@ -144,14 +144,14 @@ test "MsgBuffer" { .{ .input = "part", .nb = 1 }, }; - var buf: [MaxSize]u8 = undefined; - var msg_buf = MsgBuffer{ .buf = &buf }; + var b: [MaxSize]u8 = undefined; + var buf = Buffer{ .buf = &b }; for (cases) |case| { var nb: u8 = 0; var input = case.input; while (input.len > 0) { - const parts = msg_buf.read(input) catch |err| { + const parts = buf.read(input) catch |err| { if (err == error.MsgMultipart) break; // go to the next case input return err; }; diff --git a/src/server.zig b/src/server.zig index 102a14c3..5e83c009 100644 --- a/src/server.zig +++ b/src/server.zig @@ -30,7 +30,7 @@ const CloseError = jsruntime.IO.CloseError; const CancelError = jsruntime.IO.CancelError; const TimeoutError = jsruntime.IO.TimeoutError; -const MsgBuffer = @import("msg.zig").MsgBuffer; +const MsgBuffer = @import("msg.zig").Buffer; const MaxSize = @import("msg.zig").MaxSize; const Browser = @import("browser/browser.zig").Browser; const cdp = @import("cdp/cdp.zig");