From bafdca3ffa61bade57a55097a295fc2830912b61 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 22 May 2024 16:24:39 +0200 Subject: [PATCH] MsgBuffer to handle both combined and multipart read Signed-off-by: Francis Bouvier --- src/server.zig | 186 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 144 insertions(+), 42 deletions(-) diff --git a/src/server.zig b/src/server.zig index 721c2b2f..65d32543 100644 --- a/src/server.zig +++ b/src/server.zig @@ -18,6 +18,8 @@ const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; // I/O Recv // -------- +const BufReadSize = 1024; + pub const Cmd = struct { // internal fields @@ -25,6 +27,8 @@ pub const Cmd = struct { buf: []u8, // only for read operations err: ?Error = null, + msg_buf: *MsgBuffer, + // CDP state: cdp.State = .{}, @@ -57,7 +61,7 @@ pub const Cmd = struct { } // read and execute input - readInput(input, Cmd.do, self) catch unreachable; + self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch unreachable; // continue receving incomming messages asynchronously self.loop().io.recv(*Cmd, self, cbk, completion, self.socket, self.buf); @@ -74,7 +78,7 @@ pub const Cmd = struct { return self.browser.currentSession().loop; } - fn do(self: *Cmd, cmd: []const u8) !void { + fn do(self: *Cmd, cmd: []const u8) anyerror!void { const res = try cdp.do(self.alloc(), cmd, self); // send result @@ -85,60 +89,153 @@ pub const Cmd = struct { } }; -fn readInput(buf: []const u8, func: anytype, data: anytype) !void { - var input = buf; +/// MsgBuffer return messages from a raw text read stream, +/// according to the following format `:`. +/// It handles both: +/// - combined messages in one read +/// - single message in several read (multipart) +/// It is safe (and good practice) to reuse the same MsgBuffer +/// on several reads of the same stream. +const MsgBuffer = struct { + size: usize = 0, + buf: []u8, + pos: usize = 0, - while (true) { - var cmd: []const u8 = undefined; - - // size msg - var size_msg: usize = undefined; - - // parse json msg size - const size_pos = std.mem.indexOfScalar(u8, input, ':').?; - std.log.debug("msg size pos: {d}", .{size_pos}); - const size_str = input[0..size_pos]; - input = input[size_pos + 1 ..]; - size_msg = try std.fmt.parseInt(u32, size_str, 10); - // } - std.log.debug("msg size: {d}", .{size_msg}); - - // handle several JSON msg in 1 read - const is_multi = input.len > size_msg; - std.log.debug("is_multi: {any}", .{is_multi}); - cmd = input[0..size_msg]; - std.log.debug("cmd: {s}", .{cmd[0..@min(BufReadSize, size_msg)]}); - if (is_multi) { - input = input[size_msg..]; - std.log.debug("rest: {s}", .{input}); - } - - try @call(.auto, func, .{ data, cmd }); - - if (!is_multi) break; - - // TODO: handle 1 read smaller than a complete JSON msg + fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer { + const buf = try alloc.alloc(u8, size); + return .{ .buf = buf }; } -} + + fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void { + alloc.free(self.buf); + } + + fn isFinished(self: *MsgBuffer) bool { + return self.pos >= self.size; + } + + fn isEmpty(self: MsgBuffer) bool { + return self.size == 0 and self.pos == 0; + } + + fn reset(self: *MsgBuffer) void { + self.size = 0; + self.pos = 0; + } + + // read input + // - `do_func` is a callback to execute on each message of the input + // - `data` is a arbitrary payload that will be passed to the callback along with + // the message itself + fn read( + self: *MsgBuffer, + alloc: std.mem.Allocator, + input: []const u8, + data: anytype, + comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void, + ) !void { + var _input = input; // make input writable + + while (true) { + var msg: []const u8 = undefined; + + // msg size + var msg_size: usize = undefined; + if (self.isEmpty()) { + // parse msg size metadata + const size_pos = std.mem.indexOfScalar(u8, _input, ':').?; + const size_str = _input[0..size_pos]; + msg_size = try std.fmt.parseInt(u32, size_str, 10); + _input = _input[size_pos + 1 ..]; + } else { + msg_size = self.size; + } + + // multipart + const is_multipart = !self.isEmpty() or _input.len < msg_size; + if (is_multipart) { + + // set msg size on empty MsgBuffer + if (self.isEmpty()) { + self.size = msg_size; + } + + // get the new position of the cursor + const new_pos = self.pos + _input.len; + + // check if the current input can fit in MsgBuffer + if (new_pos > self.buf.len) { + // max_size is the max between msg size and current new cursor position + const max_size = @max(self.size, new_pos); + // resize the MsgBuffer to fit + self.buf = try alloc.realloc(self.buf, max_size); + } + + // copy the current input into MsgBuffer + @memcpy(self.buf[self.pos..new_pos], _input[0..]); + + // set the new cursor position + self.pos = new_pos; + + // if multipart is not finished, go fetch the next input + if (!self.isFinished()) return; + + // otherwhise multipart is finished, use its buffer as input + _input = self.buf[0..self.pos]; + self.reset(); + } + + // handle several JSON msg in 1 read + const is_combined = _input.len > msg_size; + msg = _input[0..msg_size]; + std.log.debug("msg: {s}", .{msg[0..@min(BufReadSize, msg_size)]}); + if (is_combined) { + _input = _input[msg_size..]; + } + + try @call(.auto, do_func, .{ data, msg }); + + if (!is_combined) break; + } + } +}; fn doTest(nb: *u8, _: []const u8) anyerror!void { nb.* += 1; } -test { +test "MsgBuffer" { const Case = struct { input: []const u8, nb: u8, }; + const alloc = std.testing.allocator; const cases = [_]Case{ // simple .{ .input = "2:ok", .nb = 1 }, - // multi + // combined .{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here + // multipart + .{ .input = "9:multi", .nb = 0 }, + .{ .input = "part", .nb = 1 }, + // multipart & combined + .{ .input = "9:multi", .nb = 0 }, + .{ .input = "part2:ok", .nb = 2 }, + // several multipart + .{ .input = "23:multi", .nb = 0 }, + .{ .input = "several", .nb = 0 }, + .{ .input = "complex", .nb = 0 }, + .{ .input = "part", .nb = 1 }, + // combined & multipart + .{ .input = "2:ok9:multi", .nb = 1 }, + .{ .input = "part", .nb = 1 }, }; + var nb: u8 = undefined; + var msg_buf = try MsgBuffer.init(alloc, 10); + defer msg_buf.deinit(alloc); for (cases) |case| { - var nb: u8 = 0; - try readInput(case.input, doTest, &nb); + nb = 0; + try msg_buf.read(alloc, case.input, &nb, doTest); try std.testing.expect(nb == case.nb); } } @@ -232,14 +329,20 @@ const Accept = struct { // ------ pub fn listen(browser: *Browser, socket: std.os.socket_t) anyerror!void { + const loop = browser.currentSession().loop; + + // MsgBuffer + var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize); + defer msg_buf.deinit(loop.alloc); // create I/O contexts and callbacks // for accepting connections and receving messages - var input: [1024]u8 = undefined; + var ctxInput: [BufReadSize]u8 = undefined; var cmd = Cmd{ .browser = browser, .socket = undefined, - .buf = &input, + .buf = &ctxInput, + .msg_buf = &msg_buf, }; var accept = Accept{ .cmd = &cmd, @@ -247,7 +350,6 @@ pub fn listen(browser: *Browser, socket: std.os.socket_t) anyerror!void { }; // accepting connection asynchronously on internal server - const loop = browser.currentSession().loop; var completion: Completion = undefined; loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket);