Merge pull request #295 from lightpanda-io/fix_cdp_full_async

Fix cdp full async
This commit is contained in:
Francis Bouvier
2024-11-06 18:14:43 +01:00
committed by GitHub
2 changed files with 77 additions and 49 deletions

View File

@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const jsruntime = @import("jsruntime"); const jsruntime = @import("jsruntime");
const Completion = jsruntime.IO.Completion; const Completion = jsruntime.IO.Completion;
@@ -24,6 +25,7 @@ const AcceptError = jsruntime.IO.AcceptError;
const RecvError = jsruntime.IO.RecvError; const RecvError = jsruntime.IO.RecvError;
const SendError = jsruntime.IO.SendError; const SendError = jsruntime.IO.SendError;
const CloseError = jsruntime.IO.CloseError; const CloseError = jsruntime.IO.CloseError;
const CancelError = jsruntime.IO.CancelError;
const TimeoutError = jsruntime.IO.TimeoutError; const TimeoutError = jsruntime.IO.TimeoutError;
const MsgBuffer = @import("msg.zig").MsgBuffer; const MsgBuffer = @import("msg.zig").MsgBuffer;
@@ -31,12 +33,13 @@ const Browser = @import("browser/browser.zig").Browser;
const cdp = @import("cdp/cdp.zig"); const cdp = @import("cdp/cdp.zig");
const NoError = error{NoError}; const NoError = error{NoError};
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError; const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError;
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
const TimeoutCheck = std.time.ns_per_ms * 100; const TimeoutCheck = std.time.ns_per_ms * 100;
const log = std.log.scoped(.server); const log = std.log.scoped(.server);
const isLinux = builtin.target.os.tag == .linux;
// I/O Main // I/O Main
// -------- // --------
@@ -55,6 +58,7 @@ pub const Ctx = struct {
err: ?Error = null, err: ?Error = null,
// I/O fields // I/O fields
accept_completion: *Completion,
conn_completion: *Completion, conn_completion: *Completion,
timeout_completion: *Completion, timeout_completion: *Completion,
timeout: u64, timeout: u64,
@@ -76,12 +80,14 @@ pub const Ctx = struct {
completion: *Completion, completion: *Completion,
result: AcceptError!std.posix.socket_t, result: AcceptError!std.posix.socket_t,
) void { ) void {
std.debug.assert(completion == self.conn_completion); std.debug.assert(completion == self.acceptCompletion());
self.conn_socket = result catch |err| { self.conn_socket = result catch |err| {
log.err("accept error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
log.info("client connected", .{});
// set connection timestamp and timeout // set connection timestamp and timeout
self.last_active = std.time.Instant.now() catch |err| { self.last_active = std.time.Instant.now() catch |err| {
@@ -111,6 +117,11 @@ pub const Ctx = struct {
std.debug.assert(completion == self.conn_completion); std.debug.assert(completion == self.conn_completion);
const size = result catch |err| { const size = result catch |err| {
if (err == error.Canceled) {
log.debug("read canceled", .{});
return;
}
log.err("read error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
@@ -169,6 +180,7 @@ pub const Ctx = struct {
std.debug.assert(completion == self.timeout_completion); std.debug.assert(completion == self.timeout_completion);
_ = result catch |err| { _ = result catch |err| {
log.err("timeout error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
@@ -185,21 +197,9 @@ pub const Ctx = struct {
}; };
if (now.since(self.last_active.?) > self.timeout) { if (now.since(self.last_active.?) > self.timeout) {
// closing
log.debug("conn timeout, closing...", .{});
// NOTE: we should cancel the current read
// but it seems that's just closing the connection is enough
// (and cancel does not work on MacOS)
// close current connection // close current connection
self.loop.io.close( log.debug("conn timeout, closing...", .{});
*Ctx, self.cancelAndClose();
self,
Ctx.closeCbk,
self.timeout_completion,
self.conn_socket,
);
return; return;
} }
@@ -213,36 +213,17 @@ pub const Ctx = struct {
); );
} }
fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void { fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
_ = completion; std.debug.assert(completion == self.accept_completion);
// NOTE: completion can be either self.conn_completion or self.timeout_completion
_ = result catch |err| { _ = result catch |err| {
log.err("cancel error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
log.debug("cancel done", .{});
// conn is closed self.close();
self.last_active = null;
// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}
log.info("accepting new conn...", .{});
// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.conn_completion,
self.accept_socket,
);
} }
// shortcuts // shortcuts
@@ -263,6 +244,15 @@ pub const Ctx = struct {
return self.browser.session.env; return self.browser.session.env;
} }
inline fn acceptCompletion(self: *Ctx) *Completion {
// NOTE: the logical completion to use here is the accept_completion
// as the pipe_connection can be used simulteanously by a recv I/O operation.
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
// so we use the pipe_connection to avoid this problem
if (isLinux) return self.accept_completion;
return self.conn_completion;
}
// actions // actions
// ------- // -------
@@ -272,13 +262,7 @@ pub const Ctx = struct {
if (std.mem.eql(u8, cmd, "close")) { if (std.mem.eql(u8, cmd, "close")) {
// close connection // close connection
log.info("close cmd, closing conn...", .{}); log.info("close cmd, closing conn...", .{});
self.loop.io.close( self.cancelAndClose();
*Ctx,
self,
Ctx.closeCbk,
self.conn_completion,
self.conn_socket,
);
return error.Closed; return error.Closed;
} }
@@ -303,6 +287,47 @@ pub const Ctx = struct {
} }
} }
fn cancelAndClose(self: *Ctx) void {
if (isLinux) { // cancel is only available on Linux
self.loop.io.cancel(
*Ctx,
self,
Ctx.cancelCbk,
self.accept_completion,
self.conn_completion,
);
} else {
self.close();
}
}
fn close(self: *Ctx) void {
std.posix.close(self.conn_socket);
// conn is closed
log.debug("connection closed", .{});
self.last_active = null;
// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}
log.info("accepting new conn...", .{});
// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.acceptCompletion(),
self.accept_socket,
);
}
fn newSession(self: *Ctx) !void { fn newSession(self: *Ctx) !void {
try self.browser.newSession(self.alloc(), self.loop); try self.browser.newSession(self.alloc(), self.loop);
try self.browser.session.initInspector( try self.browser.session.initInspector(
@@ -388,6 +413,7 @@ const Send = struct {
fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void { fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void {
_ = result catch |err| { _ = result catch |err| {
log.err("send error: {any}", .{err});
self.ctx.err = err; self.ctx.err = err;
return; return;
}; };
@@ -425,6 +451,7 @@ pub fn listen(
defer msg_buf.deinit(loop.alloc); defer msg_buf.deinit(loop.alloc);
// create I/O completions // create I/O completions
var accept_completion: Completion = undefined;
var conn_completion: Completion = undefined; var conn_completion: Completion = undefined;
var timeout_completion: Completion = undefined; var timeout_completion: Completion = undefined;
@@ -438,6 +465,7 @@ pub fn listen(
.msg_buf = &msg_buf, .msg_buf = &msg_buf,
.accept_socket = server_socket, .accept_socket = server_socket,
.timeout = timeout, .timeout = timeout,
.accept_completion = &accept_completion,
.conn_completion = &conn_completion, .conn_completion = &conn_completion,
.timeout_completion = &timeout_completion, .timeout_completion = &timeout_completion,
}; };
@@ -449,7 +477,7 @@ pub fn listen(
// accepting connection asynchronously on internal server // accepting connection asynchronously on internal server
log.info("accepting new conn...", .{}); log.info("accepting new conn...", .{});
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket); loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);
// infinite loop on I/O events, either: // infinite loop on I/O events, either:
// - cmd from incoming connection on server socket // - cmd from incoming connection on server socket