From f6f5ec5eb3fd3b50ee5504c956c613dc6c80b16f Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Tue, 5 Nov 2024 21:46:52 +0100 Subject: [PATCH] server: add cancel current recv before accepting new connection Only on Linux. On MacOS cancel is not supported for now and we do not have any problem with the current recv operation on a closed socket. Signed-off-by: Francis Bouvier --- src/server.zig | 121 +++++++++++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 49 deletions(-) diff --git a/src/server.zig b/src/server.zig index 0e52689d..bfba7ae8 100644 --- a/src/server.zig +++ b/src/server.zig @@ -17,6 +17,7 @@ // along with this program. If not, see . const std = @import("std"); +const builtin = @import("builtin"); const jsruntime = @import("jsruntime"); const Completion = jsruntime.IO.Completion; @@ -24,6 +25,7 @@ const AcceptError = jsruntime.IO.AcceptError; const RecvError = jsruntime.IO.RecvError; const SendError = jsruntime.IO.SendError; const CloseError = jsruntime.IO.CloseError; +const CancelError = jsruntime.IO.CancelError; const TimeoutError = jsruntime.IO.TimeoutError; const MsgBuffer = @import("msg.zig").MsgBuffer; @@ -31,12 +33,13 @@ const Browser = @import("browser/browser.zig").Browser; const cdp = @import("cdp/cdp.zig"); const NoError = error{NoError}; -const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError; +const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError; const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; const TimeoutCheck = std.time.ns_per_ms * 100; const log = std.log.scoped(.server); +const isLinux = builtin.target.os.tag == .linux; // I/O Main // -------- @@ -55,6 +58,7 @@ pub const Ctx = struct { err: ?Error = null, // I/O fields + accept_completion: *Completion, conn_completion: *Completion, timeout_completion: *Completion, timeout: u64, @@ -76,13 +80,14 @@ pub const Ctx = struct { completion: *Completion, result: AcceptError!std.posix.socket_t, ) void { - std.debug.assert(completion == self.conn_completion); + std.debug.assert(completion == self.acceptCompletion()); self.conn_socket = result catch |err| { log.err("accept error: {any}", .{err}); self.err = err; return; }; + log.info("client connected", .{}); // set connection timestamp and timeout self.last_active = std.time.Instant.now() catch |err| { @@ -112,6 +117,10 @@ pub const Ctx = struct { std.debug.assert(completion == self.conn_completion); const size = result catch |err| { + if (err == error.Canceled) { + log.debug("read canceled", .{}); + return; + } log.err("read error: {any}", .{err}); self.err = err; return; @@ -188,21 +197,9 @@ pub const Ctx = struct { }; if (now.since(self.last_active.?) > self.timeout) { - // closing - log.debug("conn timeout, closing...", .{}); - - // NOTE: we should cancel the current read - // but it seems that's just closing the connection is enough - // (and cancel does not work on MacOS) - // close current connection - self.loop.io.close( - *Ctx, - self, - Ctx.closeCbk, - self.timeout_completion, - self.conn_socket, - ); + log.debug("conn timeout, closing...", .{}); + self.cancelAndClose(); return; } @@ -216,37 +213,17 @@ pub const Ctx = struct { ); } - fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void { - _ = completion; - // NOTE: completion can be either self.conn_completion or self.timeout_completion + fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void { + std.debug.assert(completion == self.accept_completion); _ = result catch |err| { - log.err("close error: {any}", .{err}); + log.err("cancel error: {any}", .{err}); self.err = err; return; }; + log.debug("cancel done", .{}); - // conn is closed - self.last_active = null; - - // restart a new browser session in case of re-connect - if (!self.sessionNew) { - self.newSession() catch |err| { - log.err("new session error: {any}", .{err}); - return; - }; - } - - log.info("accepting new conn...", .{}); - - // continue accepting incoming requests - self.loop.io.accept( - *Ctx, - self, - Ctx.acceptCbk, - self.conn_completion, - self.accept_socket, - ); + self.close(); } // shortcuts @@ -267,6 +244,15 @@ pub const Ctx = struct { return self.browser.session.env; } + inline fn acceptCompletion(self: *Ctx) *Completion { + // NOTE: the logical completion to use here is the accept_completion + // as the pipe_connection can be used simulteanously by a recv I/O operation. + // But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic + // so we use the pipe_connection to avoid this problem + if (isLinux) return self.accept_completion; + return self.conn_completion; + } + // actions // ------- @@ -276,13 +262,7 @@ pub const Ctx = struct { if (std.mem.eql(u8, cmd, "close")) { // close connection log.info("close cmd, closing conn...", .{}); - self.loop.io.close( - *Ctx, - self, - Ctx.closeCbk, - self.conn_completion, - self.conn_socket, - ); + self.cancelAndClose(); return error.Closed; } @@ -307,6 +287,47 @@ pub const Ctx = struct { } } + fn cancelAndClose(self: *Ctx) void { + if (isLinux) { // cancel is only available on Linux + self.loop.io.cancel( + *Ctx, + self, + Ctx.cancelCbk, + self.accept_completion, + self.conn_completion, + ); + } else { + self.close(); + } + } + + fn close(self: *Ctx) void { + std.posix.close(self.conn_socket); + + // conn is closed + log.debug("connection closed", .{}); + self.last_active = null; + + // restart a new browser session in case of re-connect + if (!self.sessionNew) { + self.newSession() catch |err| { + log.err("new session error: {any}", .{err}); + return; + }; + } + + log.info("accepting new conn...", .{}); + + // continue accepting incoming requests + self.loop.io.accept( + *Ctx, + self, + Ctx.acceptCbk, + self.acceptCompletion(), + self.accept_socket, + ); + } + fn newSession(self: *Ctx) !void { try self.browser.newSession(self.alloc(), self.loop); try self.browser.session.initInspector( @@ -430,6 +451,7 @@ pub fn listen( defer msg_buf.deinit(loop.alloc); // create I/O completions + var accept_completion: Completion = undefined; var conn_completion: Completion = undefined; var timeout_completion: Completion = undefined; @@ -443,6 +465,7 @@ pub fn listen( .msg_buf = &msg_buf, .accept_socket = server_socket, .timeout = timeout, + .accept_completion = &accept_completion, .conn_completion = &conn_completion, .timeout_completion = &timeout_completion, }; @@ -454,7 +477,7 @@ pub fn listen( // accepting connection asynchronously on internal server log.info("accepting new conn...", .{}); - loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket); + loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket); // infinite loop on I/O events, either: // - cmd from incoming connection on server socket