Merge pull request #278 from lightpanda-io/cdp_full_async

Cdp full async
This commit is contained in:
Francis Bouvier
2024-11-01 18:14:21 +01:00
committed by GitHub
5 changed files with 95 additions and 105 deletions

View File

@@ -226,8 +226,7 @@ pub fn sendEvent(
const resp = Resp{ .method = name, .params = params, .sessionId = sessionID };
const event_msg = try stringify(alloc, resp);
defer alloc.free(event_msg);
try server.sendSync(ctx, event_msg);
try server.sendAsync(ctx, event_msg);
}
fn getParams(

View File

@@ -331,8 +331,7 @@ fn navigate(
.loaderId = ctx.state.loaderID,
};
const res = try result(alloc, msg.id, Resp, resp, msg.sessionID);
defer alloc.free(res);
try server.sendSync(ctx, res);
try server.sendAsync(ctx, res);
// TODO: at this point do we need async the following actions to be async?

View File

@@ -256,8 +256,7 @@ fn disposeBrowserContext(
// output
const res = try result(alloc, msg.id, null, .{}, null);
defer alloc.free(res);
try server.sendSync(ctx, res);
try server.sendAsync(ctx, res);
return error.DisposeBrowserContext;
}
@@ -345,8 +344,7 @@ fn closeTarget(
success: bool = true,
};
const res = try result(alloc, msg.id, Resp, Resp{}, null);
defer alloc.free(res);
try server.sendSync(ctx, res);
try server.sendAsync(ctx, res);
// Inspector.detached event
const InspectorDetached = struct {

View File

@@ -55,20 +55,12 @@ pub const MsgBuffer = struct {
}
// read input
// - `do_func` is a callback to execute on each message of the input
// - `data` is an arbitrary user data that will be forwarded to the do_func callback
pub fn read(
self: *MsgBuffer,
alloc: std.mem.Allocator,
input: []const u8,
data: anytype,
comptime do_func: fn (data: @TypeOf(data), msg: []const u8) anyerror!void,
) !void {
pub fn read(self: *MsgBuffer, alloc: std.mem.Allocator, input: []const u8) !struct {
msg: []const u8,
left: []const u8,
} {
var _input = input; // make input writable
while (true) {
var msg: []const u8 = undefined;
// msg size
var msg_size: usize = undefined;
if (self.isEmpty()) {
@@ -102,7 +94,7 @@ pub const MsgBuffer = struct {
if (new_pos > self.buf.len) {
// we want to realloc at least:
// - a size big enough to fit the entire input (ie. new_pos)
// - a size big enough (ie. current size + starting size)
// - a size big enough (ie. current msg size + starting buffer size)
// to avoid multiple reallocation
const new_size = @max(self.buf.len + self.size, new_pos);
// resize the MsgBuffer to fit
@@ -116,7 +108,7 @@ pub const MsgBuffer = struct {
self.pos = new_pos;
// if multipart is not finished, go fetch the next input
if (!self.isFinished()) return;
if (!self.isFinished()) return error.MsgMultipart;
// otherwhise multipart is finished, use its buffer as input
_input = self.buf[0..self.pos];
@@ -124,20 +116,11 @@ pub const MsgBuffer = struct {
}
// handle several JSON msg in 1 read
const is_combined = _input.len > msg_size;
msg = _input[0..msg_size];
if (is_combined) {
_input = _input[msg_size..];
}
try @call(.auto, do_func, .{ data, msg });
if (!is_combined) break;
}
return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] };
}
};
fn doTest(nb: *u8, _: []const u8) anyerror!void {
fn doTest(nb: *u8) void {
nb.* += 1;
}
@@ -171,12 +154,19 @@ test "MsgBuffer" {
.{ .input = "2:ok9:multi", .nb = 1 },
.{ .input = "part", .nb = 1 },
};
var nb: u8 = undefined;
var msg_buf = try MsgBuffer.init(alloc, 10);
defer msg_buf.deinit(alloc);
for (cases) |case| {
nb = 0;
try msg_buf.read(alloc, case.input, &nb, doTest);
var nb: u8 = 0;
var input: []const u8 = case.input;
while (input.len > 0) {
const parts = msg_buf.read(alloc, input) catch |err| {
if (err == error.MsgMultipart) break; // go to the next case input
return err;
};
nb += 1;
input = parts.left;
}
try std.testing.expect(nb == case.nb);
}
}

View File

@@ -128,17 +128,6 @@ pub const Ctx = struct {
return;
}
// input
const input = self.read_buf[0..size];
// read and execute input
self.msg_buf.read(self.alloc(), input, self, Ctx.do) catch |err| {
if (err != error.Closed) {
log.err("do error: {any}", .{err});
}
return;
};
// set connection timestamp
self.last_active = std.time.Instant.now() catch |err| {
log.err("read timestamp error: {any}", .{err});
@@ -154,6 +143,26 @@ pub const Ctx = struct {
self.conn_socket,
self.read_buf,
);
// read and execute input
var input: []const u8 = self.read_buf[0..size];
while (input.len > 0) {
const parts = self.msg_buf.read(self.alloc(), input) catch |err| {
if (err == error.MsgMultipart) {
return;
} else {
log.err("msg read error: {any}", .{err});
return;
}
};
input = parts.left;
// execute
self.do(parts.msg) catch |err| {
if (err != error.Closed) {
log.err("do error: {any}", .{err});
}
};
}
}
fn timeoutCbk(self: *Ctx, completion: *Completion, result: TimeoutError!void) void {
@@ -327,9 +336,8 @@ pub const Ctx = struct {
tpl,
.{ msg_open, cdp.ContextSessionID },
);
defer ctx.alloc().free(s);
try sendSync(ctx, s);
try sendAsync(ctx, s);
}
pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void {
@@ -392,10 +400,6 @@ pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void {
ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, msg);
}
pub fn sendSync(ctx: *Ctx, msg: []const u8) !void {
_ = try std.posix.write(ctx.conn_socket, msg);
}
// Listen
// ------