mirror of
https://github.com/lightpanda-io/browser.git
synced 2025-10-29 07:03:29 +00:00
cdp: send I/O next read before executing current cmd
Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
46
src/msg.zig
46
src/msg.zig
@@ -55,20 +55,12 @@ pub const MsgBuffer = struct {
|
||||
}
|
||||
|
||||
// read input
|
||||
// - `do_func` is a callback to execute on each message of the input
|
||||
// - `data` is an arbitrary user data that will be forwarded to the do_func callback
|
||||
pub fn read(
|
||||
self: *MsgBuffer,
|
||||
alloc: std.mem.Allocator,
|
||||
input: []const u8,
|
||||
data: anytype,
|
||||
comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void,
|
||||
) !void {
|
||||
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct {
|
||||
msg: []const u8,
|
||||
left: []const u8,
|
||||
} {
|
||||
var _input = input; // make input writable
|
||||
|
||||
while (true) {
|
||||
var msg: []const u8 = undefined;
|
||||
|
||||
// msg size
|
||||
var msg_size: usize = undefined;
|
||||
if (self.isEmpty()) {
|
||||
@@ -102,7 +94,7 @@ pub const MsgBuffer = struct {
|
||||
if (new_pos > self.buf.len) {
|
||||
// we want to realloc at least:
|
||||
// - a size big enough to fit the entire input (ie. new_pos)
|
||||
// - a size big enough (ie. current size + starting size)
|
||||
// - a size big enough (ie. current msg size + starting buffer size)
|
||||
// to avoid multiple reallocation
|
||||
const new_size = @max(self.buf.len + self.size, new_pos);
|
||||
// resize the MsgBuffer to fit
|
||||
@@ -116,7 +108,7 @@ pub const MsgBuffer = struct {
|
||||
self.pos = new_pos;
|
||||
|
||||
// if multipart is not finished, go fetch the next input
|
||||
if (!self.isFinished()) return;
|
||||
if (!self.isFinished()) return error.MsgMultipart;
|
||||
|
||||
// otherwhise multipart is finished, use its buffer as input
|
||||
_input = self.buf[0..self.pos];
|
||||
@@ -124,20 +116,11 @@ pub const MsgBuffer = struct {
|
||||
}
|
||||
|
||||
// handle several JSON msg in 1 read
|
||||
const is_combined = _input.len > msg_size;
|
||||
msg = _input[0..msg_size];
|
||||
if (is_combined) {
|
||||
_input = _input[msg_size..];
|
||||
}
|
||||
|
||||
try @call(.auto, do_func, .{ data, msg });
|
||||
|
||||
if (!is_combined) break;
|
||||
}
|
||||
return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] };
|
||||
}
|
||||
};
|
||||
|
||||
fn doTest(nb: *u8, _: []const u8) anyerror!void {
|
||||
fn doTest(nb: *u8) void {
|
||||
nb.* += 1;
|
||||
}
|
||||
|
||||
@@ -171,12 +154,19 @@ test "MsgBuffer" {
|
||||
.{ .input = "2:ok9:multi", .nb = 1 },
|
||||
.{ .input = "part", .nb = 1 },
|
||||
};
|
||||
var nb: u8 = undefined;
|
||||
var msg_buf = try MsgBuffer.init(alloc, 10);
|
||||
defer msg_buf.deinit(alloc);
|
||||
for (cases) |case| {
|
||||
nb = 0;
|
||||
try msg_buf.read(alloc, case.input, &nb, doTest);
|
||||
var nb: u8 = 0;
|
||||
var input: []const u8 = case.input;
|
||||
while (input.len > 0) {
|
||||
const parts = msg_buf.read(alloc, input) catch |err| {
|
||||
if (err == error.MsgMultipart) break; // go to the next case input
|
||||
return err;
|
||||
};
|
||||
nb += 1;
|
||||
input = parts.left;
|
||||
}
|
||||
try std.testing.expect(nb == case.nb);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,17 +128,6 @@ pub const Ctx = struct {
|
||||
return;
|
||||
}
|
||||
|
||||
// input
|
||||
const input = self.read_buf[0..size];
|
||||
|
||||
// read and execute input
|
||||
self.msg_buf.read(self.alloc(), input, self, Ctx.do) catch |err| {
|
||||
if (err != error.Closed) {
|
||||
log.err("do error: {any}", .{err});
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
// set connection timestamp
|
||||
self.last_active = std.time.Instant.now() catch |err| {
|
||||
log.err("read timestamp error: {any}", .{err});
|
||||
@@ -154,6 +143,26 @@ pub const Ctx = struct {
|
||||
self.conn_socket,
|
||||
self.read_buf,
|
||||
);
|
||||
|
||||
// read and execute input
|
||||
var input: []const u8 = self.read_buf[0..size];
|
||||
while (input.len > 0) {
|
||||
const parts = self.msg_buf.read(self.alloc(), input) catch |err| {
|
||||
if (err == error.MsgMultipart) {
|
||||
return;
|
||||
} else {
|
||||
log.err("msg read error: {any}", .{err});
|
||||
return;
|
||||
}
|
||||
};
|
||||
input = parts.left;
|
||||
// execute
|
||||
self.do(parts.msg) catch |err| {
|
||||
if (err != error.Closed) {
|
||||
log.err("do error: {any}", .{err});
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn timeoutCbk(self: *Ctx, completion: *Completion, result: TimeoutError!void) void {
|
||||
|
||||
Reference in New Issue
Block a user