server: add cancel current recv before accepting new connection

Only on Linux. On MacOS cancel is not supported for now and
we do not have any problem with the current recv operation
on a closed socket.

Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
Francis Bouvier
2024-11-05 21:46:52 +01:00
parent c74feb9c3a
commit f6f5ec5eb3

View File

@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const jsruntime = @import("jsruntime"); const jsruntime = @import("jsruntime");
const Completion = jsruntime.IO.Completion; const Completion = jsruntime.IO.Completion;
@@ -24,6 +25,7 @@ const AcceptError = jsruntime.IO.AcceptError;
const RecvError = jsruntime.IO.RecvError; const RecvError = jsruntime.IO.RecvError;
const SendError = jsruntime.IO.SendError; const SendError = jsruntime.IO.SendError;
const CloseError = jsruntime.IO.CloseError; const CloseError = jsruntime.IO.CloseError;
const CancelError = jsruntime.IO.CancelError;
const TimeoutError = jsruntime.IO.TimeoutError; const TimeoutError = jsruntime.IO.TimeoutError;
const MsgBuffer = @import("msg.zig").MsgBuffer; const MsgBuffer = @import("msg.zig").MsgBuffer;
@@ -31,12 +33,13 @@ const Browser = @import("browser/browser.zig").Browser;
const cdp = @import("cdp/cdp.zig"); const cdp = @import("cdp/cdp.zig");
const NoError = error{NoError}; const NoError = error{NoError};
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError; const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError;
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
const TimeoutCheck = std.time.ns_per_ms * 100; const TimeoutCheck = std.time.ns_per_ms * 100;
const log = std.log.scoped(.server); const log = std.log.scoped(.server);
const isLinux = builtin.target.os.tag == .linux;
// I/O Main // I/O Main
// -------- // --------
@@ -55,6 +58,7 @@ pub const Ctx = struct {
err: ?Error = null, err: ?Error = null,
// I/O fields // I/O fields
accept_completion: *Completion,
conn_completion: *Completion, conn_completion: *Completion,
timeout_completion: *Completion, timeout_completion: *Completion,
timeout: u64, timeout: u64,
@@ -76,13 +80,14 @@ pub const Ctx = struct {
completion: *Completion, completion: *Completion,
result: AcceptError!std.posix.socket_t, result: AcceptError!std.posix.socket_t,
) void { ) void {
std.debug.assert(completion == self.conn_completion); std.debug.assert(completion == self.acceptCompletion());
self.conn_socket = result catch |err| { self.conn_socket = result catch |err| {
log.err("accept error: {any}", .{err}); log.err("accept error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
log.info("client connected", .{});
// set connection timestamp and timeout // set connection timestamp and timeout
self.last_active = std.time.Instant.now() catch |err| { self.last_active = std.time.Instant.now() catch |err| {
@@ -112,6 +117,10 @@ pub const Ctx = struct {
std.debug.assert(completion == self.conn_completion); std.debug.assert(completion == self.conn_completion);
const size = result catch |err| { const size = result catch |err| {
if (err == error.Canceled) {
log.debug("read canceled", .{});
return;
}
log.err("read error: {any}", .{err}); log.err("read error: {any}", .{err});
self.err = err; self.err = err;
return; return;
@@ -188,21 +197,9 @@ pub const Ctx = struct {
}; };
if (now.since(self.last_active.?) > self.timeout) { if (now.since(self.last_active.?) > self.timeout) {
// closing
log.debug("conn timeout, closing...", .{});
// NOTE: we should cancel the current read
// but it seems that's just closing the connection is enough
// (and cancel does not work on MacOS)
// close current connection // close current connection
self.loop.io.close( log.debug("conn timeout, closing...", .{});
*Ctx, self.cancelAndClose();
self,
Ctx.closeCbk,
self.timeout_completion,
self.conn_socket,
);
return; return;
} }
@@ -216,37 +213,17 @@ pub const Ctx = struct {
); );
} }
fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void { fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
_ = completion; std.debug.assert(completion == self.accept_completion);
// NOTE: completion can be either self.conn_completion or self.timeout_completion
_ = result catch |err| { _ = result catch |err| {
log.err("close error: {any}", .{err}); log.err("cancel error: {any}", .{err});
self.err = err; self.err = err;
return; return;
}; };
log.debug("cancel done", .{});
// conn is closed self.close();
self.last_active = null;
// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}
log.info("accepting new conn...", .{});
// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.conn_completion,
self.accept_socket,
);
} }
// shortcuts // shortcuts
@@ -267,6 +244,15 @@ pub const Ctx = struct {
return self.browser.session.env; return self.browser.session.env;
} }
inline fn acceptCompletion(self: *Ctx) *Completion {
// NOTE: the logical completion to use here is the accept_completion
// as the pipe_connection can be used simulteanously by a recv I/O operation.
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
// so we use the pipe_connection to avoid this problem
if (isLinux) return self.accept_completion;
return self.conn_completion;
}
// actions // actions
// ------- // -------
@@ -276,13 +262,7 @@ pub const Ctx = struct {
if (std.mem.eql(u8, cmd, "close")) { if (std.mem.eql(u8, cmd, "close")) {
// close connection // close connection
log.info("close cmd, closing conn...", .{}); log.info("close cmd, closing conn...", .{});
self.loop.io.close( self.cancelAndClose();
*Ctx,
self,
Ctx.closeCbk,
self.conn_completion,
self.conn_socket,
);
return error.Closed; return error.Closed;
} }
@@ -307,6 +287,47 @@ pub const Ctx = struct {
} }
} }
fn cancelAndClose(self: *Ctx) void {
if (isLinux) { // cancel is only available on Linux
self.loop.io.cancel(
*Ctx,
self,
Ctx.cancelCbk,
self.accept_completion,
self.conn_completion,
);
} else {
self.close();
}
}
fn close(self: *Ctx) void {
std.posix.close(self.conn_socket);
// conn is closed
log.debug("connection closed", .{});
self.last_active = null;
// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}
log.info("accepting new conn...", .{});
// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.acceptCompletion(),
self.accept_socket,
);
}
fn newSession(self: *Ctx) !void { fn newSession(self: *Ctx) !void {
try self.browser.newSession(self.alloc(), self.loop); try self.browser.newSession(self.alloc(), self.loop);
try self.browser.session.initInspector( try self.browser.session.initInspector(
@@ -430,6 +451,7 @@ pub fn listen(
defer msg_buf.deinit(loop.alloc); defer msg_buf.deinit(loop.alloc);
// create I/O completions // create I/O completions
var accept_completion: Completion = undefined;
var conn_completion: Completion = undefined; var conn_completion: Completion = undefined;
var timeout_completion: Completion = undefined; var timeout_completion: Completion = undefined;
@@ -443,6 +465,7 @@ pub fn listen(
.msg_buf = &msg_buf, .msg_buf = &msg_buf,
.accept_socket = server_socket, .accept_socket = server_socket,
.timeout = timeout, .timeout = timeout,
.accept_completion = &accept_completion,
.conn_completion = &conn_completion, .conn_completion = &conn_completion,
.timeout_completion = &timeout_completion, .timeout_completion = &timeout_completion,
}; };
@@ -454,7 +477,7 @@ pub fn listen(
// accepting connection asynchronously on internal server // accepting connection asynchronously on internal server
log.info("accepting new conn...", .{}); log.info("accepting new conn...", .{});
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket); loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);
// infinite loop on I/O events, either: // infinite loop on I/O events, either:
// - cmd from incoming connection on server socket // - cmd from incoming connection on server socket