mirror of
https://github.com/lightpanda-io/browser.git
synced 2025-10-28 22:53:28 +00:00
MsgBuffer to handle both combined and multipart read
Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
168
src/server.zig
168
src/server.zig
@@ -18,6 +18,8 @@ const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
|
||||
// I/O Recv
|
||||
// --------
|
||||
|
||||
const BufReadSize = 1024;
|
||||
|
||||
pub const Cmd = struct {
|
||||
|
||||
// internal fields
|
||||
@@ -25,6 +27,8 @@ pub const Cmd = struct {
|
||||
buf: []u8, // only for read operations
|
||||
err: ?Error = null,
|
||||
|
||||
msg_buf: *MsgBuffer,
|
||||
|
||||
// CDP
|
||||
state: cdp.State = .{},
|
||||
|
||||
@@ -57,7 +61,7 @@ pub const Cmd = struct {
|
||||
}
|
||||
|
||||
// read and execute input
|
||||
readInput(input, Cmd.do, self) catch unreachable;
|
||||
self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch unreachable;
|
||||
|
||||
// continue receving incomming messages asynchronously
|
||||
self.loop().io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
|
||||
@@ -74,7 +78,7 @@ pub const Cmd = struct {
|
||||
return self.browser.currentSession().loop;
|
||||
}
|
||||
|
||||
fn do(self: *Cmd, cmd: []const u8) !void {
|
||||
fn do(self: *Cmd, cmd: []const u8) anyerror!void {
|
||||
const res = try cdp.do(self.alloc(), cmd, self);
|
||||
|
||||
// send result
|
||||
@@ -85,60 +89,153 @@ pub const Cmd = struct {
|
||||
}
|
||||
};
|
||||
|
||||
fn readInput(buf: []const u8, func: anytype, data: anytype) !void {
|
||||
var input = buf;
|
||||
/// MsgBuffer return messages from a raw text read stream,
|
||||
/// according to the following format `<msg_size>:<msg>`.
|
||||
/// It handles both:
|
||||
/// - combined messages in one read
|
||||
/// - single message in several read (multipart)
|
||||
/// It is safe (and good practice) to reuse the same MsgBuffer
|
||||
/// on several reads of the same stream.
|
||||
const MsgBuffer = struct {
|
||||
size: usize = 0,
|
||||
buf: []u8,
|
||||
pos: usize = 0,
|
||||
|
||||
fn init(alloc: std.mem.Allocator, size: usize) std.mem.Allocator.Error!MsgBuffer {
|
||||
const buf = try alloc.alloc(u8, size);
|
||||
return .{ .buf = buf };
|
||||
}
|
||||
|
||||
fn deinit(self: MsgBuffer, alloc: std.mem.Allocator) void {
|
||||
alloc.free(self.buf);
|
||||
}
|
||||
|
||||
fn isFinished(self: *MsgBuffer) bool {
|
||||
return self.pos >= self.size;
|
||||
}
|
||||
|
||||
fn isEmpty(self: MsgBuffer) bool {
|
||||
return self.size == 0 and self.pos == 0;
|
||||
}
|
||||
|
||||
fn reset(self: *MsgBuffer) void {
|
||||
self.size = 0;
|
||||
self.pos = 0;
|
||||
}
|
||||
|
||||
// read input
|
||||
// - `do_func` is a callback to execute on each message of the input
|
||||
// - `data` is a arbitrary payload that will be passed to the callback along with
|
||||
// the message itself
|
||||
fn read(
|
||||
self: *MsgBuffer,
|
||||
alloc: std.mem.Allocator,
|
||||
input: []const u8,
|
||||
data: anytype,
|
||||
comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void,
|
||||
) !void {
|
||||
var _input = input; // make input writable
|
||||
|
||||
while (true) {
|
||||
var cmd: []const u8 = undefined;
|
||||
var msg: []const u8 = undefined;
|
||||
|
||||
// size msg
|
||||
var size_msg: usize = undefined;
|
||||
// msg size
|
||||
var msg_size: usize = undefined;
|
||||
if (self.isEmpty()) {
|
||||
// parse msg size metadata
|
||||
const size_pos = std.mem.indexOfScalar(u8, _input, ':').?;
|
||||
const size_str = _input[0..size_pos];
|
||||
msg_size = try std.fmt.parseInt(u32, size_str, 10);
|
||||
_input = _input[size_pos + 1 ..];
|
||||
} else {
|
||||
msg_size = self.size;
|
||||
}
|
||||
|
||||
// parse json msg size
|
||||
const size_pos = std.mem.indexOfScalar(u8, input, ':').?;
|
||||
std.log.debug("msg size pos: {d}", .{size_pos});
|
||||
const size_str = input[0..size_pos];
|
||||
input = input[size_pos + 1 ..];
|
||||
size_msg = try std.fmt.parseInt(u32, size_str, 10);
|
||||
// }
|
||||
std.log.debug("msg size: {d}", .{size_msg});
|
||||
// multipart
|
||||
const is_multipart = !self.isEmpty() or _input.len < msg_size;
|
||||
if (is_multipart) {
|
||||
|
||||
// set msg size on empty MsgBuffer
|
||||
if (self.isEmpty()) {
|
||||
self.size = msg_size;
|
||||
}
|
||||
|
||||
// get the new position of the cursor
|
||||
const new_pos = self.pos + _input.len;
|
||||
|
||||
// check if the current input can fit in MsgBuffer
|
||||
if (new_pos > self.buf.len) {
|
||||
// max_size is the max between msg size and current new cursor position
|
||||
const max_size = @max(self.size, new_pos);
|
||||
// resize the MsgBuffer to fit
|
||||
self.buf = try alloc.realloc(self.buf, max_size);
|
||||
}
|
||||
|
||||
// copy the current input into MsgBuffer
|
||||
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
|
||||
|
||||
// set the new cursor position
|
||||
self.pos = new_pos;
|
||||
|
||||
// if multipart is not finished, go fetch the next input
|
||||
if (!self.isFinished()) return;
|
||||
|
||||
// otherwhise multipart is finished, use its buffer as input
|
||||
_input = self.buf[0..self.pos];
|
||||
self.reset();
|
||||
}
|
||||
|
||||
// handle several JSON msg in 1 read
|
||||
const is_multi = input.len > size_msg;
|
||||
std.log.debug("is_multi: {any}", .{is_multi});
|
||||
cmd = input[0..size_msg];
|
||||
std.log.debug("cmd: {s}", .{cmd[0..@min(BufReadSize, size_msg)]});
|
||||
if (is_multi) {
|
||||
input = input[size_msg..];
|
||||
std.log.debug("rest: {s}", .{input});
|
||||
const is_combined = _input.len > msg_size;
|
||||
msg = _input[0..msg_size];
|
||||
std.log.debug("msg: {s}", .{msg[0..@min(BufReadSize, msg_size)]});
|
||||
if (is_combined) {
|
||||
_input = _input[msg_size..];
|
||||
}
|
||||
|
||||
try @call(.auto, func, .{ data, cmd });
|
||||
try @call(.auto, do_func, .{ data, msg });
|
||||
|
||||
if (!is_multi) break;
|
||||
|
||||
// TODO: handle 1 read smaller than a complete JSON msg
|
||||
if (!is_combined) break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
fn doTest(nb: *u8, _: []const u8) anyerror!void {
|
||||
nb.* += 1;
|
||||
}
|
||||
|
||||
test {
|
||||
test "MsgBuffer" {
|
||||
const Case = struct {
|
||||
input: []const u8,
|
||||
nb: u8,
|
||||
};
|
||||
const alloc = std.testing.allocator;
|
||||
const cases = [_]Case{
|
||||
// simple
|
||||
.{ .input = "2:ok", .nb = 1 },
|
||||
// multi
|
||||
// combined
|
||||
.{ .input = "2:ok3:foo7:bar2:ok", .nb = 3 }, // "bar2:ok" is a message, no need to escape "2:" here
|
||||
// multipart
|
||||
.{ .input = "9:multi", .nb = 0 },
|
||||
.{ .input = "part", .nb = 1 },
|
||||
// multipart & combined
|
||||
.{ .input = "9:multi", .nb = 0 },
|
||||
.{ .input = "part2:ok", .nb = 2 },
|
||||
// several multipart
|
||||
.{ .input = "23:multi", .nb = 0 },
|
||||
.{ .input = "several", .nb = 0 },
|
||||
.{ .input = "complex", .nb = 0 },
|
||||
.{ .input = "part", .nb = 1 },
|
||||
// combined & multipart
|
||||
.{ .input = "2:ok9:multi", .nb = 1 },
|
||||
.{ .input = "part", .nb = 1 },
|
||||
};
|
||||
var nb: u8 = undefined;
|
||||
var msg_buf = try MsgBuffer.init(alloc, 10);
|
||||
defer msg_buf.deinit(alloc);
|
||||
for (cases) |case| {
|
||||
var nb: u8 = 0;
|
||||
try readInput(case.input, doTest, &nb);
|
||||
nb = 0;
|
||||
try msg_buf.read(alloc, case.input, &nb, doTest);
|
||||
try std.testing.expect(nb == case.nb);
|
||||
}
|
||||
}
|
||||
@@ -232,14 +329,20 @@ const Accept = struct {
|
||||
// ------
|
||||
|
||||
pub fn listen(browser: *Browser, socket: std.os.socket_t) anyerror!void {
|
||||
const loop = browser.currentSession().loop;
|
||||
|
||||
// MsgBuffer
|
||||
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize);
|
||||
defer msg_buf.deinit(loop.alloc);
|
||||
|
||||
// create I/O contexts and callbacks
|
||||
// for accepting connections and receving messages
|
||||
var input: [1024]u8 = undefined;
|
||||
var ctxInput: [BufReadSize]u8 = undefined;
|
||||
var cmd = Cmd{
|
||||
.browser = browser,
|
||||
.socket = undefined,
|
||||
.buf = &input,
|
||||
.buf = &ctxInput,
|
||||
.msg_buf = &msg_buf,
|
||||
};
|
||||
var accept = Accept{
|
||||
.cmd = &cmd,
|
||||
@@ -247,7 +350,6 @@ pub fn listen(browser: *Browser, socket: std.os.socket_t) anyerror!void {
|
||||
};
|
||||
|
||||
// accepting connection asynchronously on internal server
|
||||
const loop = browser.currentSession().loop;
|
||||
var completion: Completion = undefined;
|
||||
loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user