diff --git a/src/server.zig b/src/server.zig index 6c428a99..bfba7ae8 100644 --- a/src/server.zig +++ b/src/server.zig @@ -17,6 +17,7 @@ // along with this program. If not, see . const std = @import("std"); +const builtin = @import("builtin"); const jsruntime = @import("jsruntime"); const Completion = jsruntime.IO.Completion; @@ -24,6 +25,7 @@ const AcceptError = jsruntime.IO.AcceptError; const RecvError = jsruntime.IO.RecvError; const SendError = jsruntime.IO.SendError; const CloseError = jsruntime.IO.CloseError; +const CancelError = jsruntime.IO.CancelError; const TimeoutError = jsruntime.IO.TimeoutError; const MsgBuffer = @import("msg.zig").MsgBuffer; @@ -31,12 +33,13 @@ const Browser = @import("browser/browser.zig").Browser; const cdp = @import("cdp/cdp.zig"); const NoError = error{NoError}; -const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError; +const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError; const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; const TimeoutCheck = std.time.ns_per_ms * 100; const log = std.log.scoped(.server); +const isLinux = builtin.target.os.tag == .linux; // I/O Main // -------- @@ -55,6 +58,7 @@ pub const Ctx = struct { err: ?Error = null, // I/O fields + accept_completion: *Completion, conn_completion: *Completion, timeout_completion: *Completion, timeout: u64, @@ -76,12 +80,14 @@ pub const Ctx = struct { completion: *Completion, result: AcceptError!std.posix.socket_t, ) void { - std.debug.assert(completion == self.conn_completion); + std.debug.assert(completion == self.acceptCompletion()); self.conn_socket = result catch |err| { + log.err("accept error: {any}", .{err}); self.err = err; return; }; + log.info("client connected", .{}); // set connection timestamp and timeout self.last_active = std.time.Instant.now() catch |err| { @@ -111,6 +117,11 @@ pub const Ctx = struct { std.debug.assert(completion == self.conn_completion); const size = result catch |err| { + if (err == error.Canceled) { + log.debug("read canceled", .{}); + return; + } + log.err("read error: {any}", .{err}); self.err = err; return; }; @@ -169,6 +180,7 @@ pub const Ctx = struct { std.debug.assert(completion == self.timeout_completion); _ = result catch |err| { + log.err("timeout error: {any}", .{err}); self.err = err; return; }; @@ -185,21 +197,9 @@ pub const Ctx = struct { }; if (now.since(self.last_active.?) > self.timeout) { - // closing - log.debug("conn timeout, closing...", .{}); - - // NOTE: we should cancel the current read - // but it seems that's just closing the connection is enough - // (and cancel does not work on MacOS) - // close current connection - self.loop.io.close( - *Ctx, - self, - Ctx.closeCbk, - self.timeout_completion, - self.conn_socket, - ); + log.debug("conn timeout, closing...", .{}); + self.cancelAndClose(); return; } @@ -213,36 +213,17 @@ pub const Ctx = struct { ); } - fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void { - _ = completion; - // NOTE: completion can be either self.conn_completion or self.timeout_completion + fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void { + std.debug.assert(completion == self.accept_completion); _ = result catch |err| { + log.err("cancel error: {any}", .{err}); self.err = err; return; }; + log.debug("cancel done", .{}); - // conn is closed - self.last_active = null; - - // restart a new browser session in case of re-connect - if (!self.sessionNew) { - self.newSession() catch |err| { - log.err("new session error: {any}", .{err}); - return; - }; - } - - log.info("accepting new conn...", .{}); - - // continue accepting incoming requests - self.loop.io.accept( - *Ctx, - self, - Ctx.acceptCbk, - self.conn_completion, - self.accept_socket, - ); + self.close(); } // shortcuts @@ -263,6 +244,15 @@ pub const Ctx = struct { return self.browser.session.env; } + inline fn acceptCompletion(self: *Ctx) *Completion { + // NOTE: the logical completion to use here is the accept_completion + // as the pipe_connection can be used simulteanously by a recv I/O operation. + // But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic + // so we use the pipe_connection to avoid this problem + if (isLinux) return self.accept_completion; + return self.conn_completion; + } + // actions // ------- @@ -272,13 +262,7 @@ pub const Ctx = struct { if (std.mem.eql(u8, cmd, "close")) { // close connection log.info("close cmd, closing conn...", .{}); - self.loop.io.close( - *Ctx, - self, - Ctx.closeCbk, - self.conn_completion, - self.conn_socket, - ); + self.cancelAndClose(); return error.Closed; } @@ -303,6 +287,47 @@ pub const Ctx = struct { } } + fn cancelAndClose(self: *Ctx) void { + if (isLinux) { // cancel is only available on Linux + self.loop.io.cancel( + *Ctx, + self, + Ctx.cancelCbk, + self.accept_completion, + self.conn_completion, + ); + } else { + self.close(); + } + } + + fn close(self: *Ctx) void { + std.posix.close(self.conn_socket); + + // conn is closed + log.debug("connection closed", .{}); + self.last_active = null; + + // restart a new browser session in case of re-connect + if (!self.sessionNew) { + self.newSession() catch |err| { + log.err("new session error: {any}", .{err}); + return; + }; + } + + log.info("accepting new conn...", .{}); + + // continue accepting incoming requests + self.loop.io.accept( + *Ctx, + self, + Ctx.acceptCbk, + self.acceptCompletion(), + self.accept_socket, + ); + } + fn newSession(self: *Ctx) !void { try self.browser.newSession(self.alloc(), self.loop); try self.browser.session.initInspector( @@ -388,6 +413,7 @@ const Send = struct { fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void { _ = result catch |err| { + log.err("send error: {any}", .{err}); self.ctx.err = err; return; }; @@ -425,6 +451,7 @@ pub fn listen( defer msg_buf.deinit(loop.alloc); // create I/O completions + var accept_completion: Completion = undefined; var conn_completion: Completion = undefined; var timeout_completion: Completion = undefined; @@ -438,6 +465,7 @@ pub fn listen( .msg_buf = &msg_buf, .accept_socket = server_socket, .timeout = timeout, + .accept_completion = &accept_completion, .conn_completion = &conn_completion, .timeout_completion = &timeout_completion, }; @@ -449,7 +477,7 @@ pub fn listen( // accepting connection asynchronously on internal server log.info("accepting new conn...", .{}); - loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket); + loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket); // infinite loop on I/O events, either: // - cmd from incoming connection on server socket diff --git a/vendor/zig-js-runtime b/vendor/zig-js-runtime index 31d188d4..f434b3cf 160000 --- a/vendor/zig-js-runtime +++ b/vendor/zig-js-runtime @@ -1 +1 @@ -Subproject commit 31d188d4fbfb0da38cd448b9a9d1ca7720cd340d +Subproject commit f434b3cfa1938277a6cd2e225974bb8d33d578c2