From c35c09db60f565e8daf336a2a4d5ee49f5f4562b Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Tue, 8 Oct 2024 23:40:50 +0200 Subject: [PATCH] server: timeout mechanism Signed-off-by: Francis Bouvier --- src/server.zig | 158 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 126 insertions(+), 32 deletions(-) diff --git a/src/server.zig b/src/server.zig index 21a97128..25ceddab 100644 --- a/src/server.zig +++ b/src/server.zig @@ -34,6 +34,9 @@ const NoError = error{NoError}; const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError; const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; +const TimeoutCheck = std.time.ns_per_ms * 100; +const TimeoutRead = std.time.ns_per_s * 3; + // I/O Recv // -------- @@ -48,19 +51,28 @@ pub const Cmd = struct { buf: []u8, // only for read operations err: ?Error = null, - completion: *Completion, - acceptCtx: *Accept = undefined, - msg_buf: *MsgBuffer, + // I/O fields + read_completion: *Completion, + timeout_completion: *Completion, + acceptCtx: *Accept = undefined, + last_active: ?std.time.Instant = null, + // CDP state: cdp.State = .{}, // JS fields browser: *Browser, // TODO: is pointer mandatory here? + sessionNew: bool, // try_catch: public.TryCatch, // TODO - fn cbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void { + // callbacks + // --------- + + fn readCbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void { + std.debug.assert(completion == self.read_completion); + const size = result catch |err| { self.err = err; return; @@ -68,7 +80,7 @@ pub const Cmd = struct { if (size == 0) { // continue receving incomming messages asynchronously - self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf); + self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf); return; } @@ -79,12 +91,6 @@ pub const Cmd = struct { std.debug.print("\ninput size: {d}, content: {s}\n", .{ size, content }); } - // close on exit command - if (std.mem.eql(u8, input, "exit")) { - self.err = error.NoError; - return; - } - // read and execute input self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch |err| { if (err != error.Closed) { @@ -93,16 +99,93 @@ pub const Cmd = struct { return; }; + // set connection timestamp + self.last_active = std.time.Instant.now() catch |err| { + std.log.err("read timestamp error: {any}", .{err}); + return; + }; + // continue receving incomming messages asynchronously - self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf); + self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf); + } + + fn timeoutCbk(self: *Cmd, completion: *Completion, result: TimeoutError!void) void { + std.debug.assert(completion == self.timeout_completion); + + _ = result catch |err| { + self.err = err; + return; + }; + + if (self.isClosed()) { + // conn is already closed, ignore timeout + return; + } + + // check time since last read + const now = std.time.Instant.now() catch |err| { + std.log.err("timeout timestamp error: {any}", .{err}); + return; + }; + + if (now.since(self.last_active.?) > TimeoutRead) { + // closing + std.log.debug("conn timeout, closing...", .{}); + + // NOTE: we should cancel the current read + // but it seems that's just closing the connection is enough + // (and cancel does not work on MacOS) + + // close current connection + self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.timeout_completion, self.socket); + return; + } + + // continue checking timeout + self.loop.io.timeout(*Cmd, self, Cmd.timeoutCbk, self.timeout_completion, TimeoutCheck); + } + + fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void { + _ = completion; + // NOTE: completion can be either self.completion or self.timeout_completion + + _ = result catch |err| { + self.err = err; + return; + }; + + // conn is closed + self.last_active = null; + + // restart a new browser session in case of re-connect + if (!self.sessionNew) { + self.newSession() catch |err| { + std.log.err("new session error: {any}", .{err}); + return; + }; + } + + std.log.debug("conn closed", .{}); + std.log.debug("accepting new conn...", .{}); + + // continue accepting incoming requests + self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, self.read_completion, self.acceptCtx.socket); } // shortcuts + // --------- + + inline fn isClosed(self: *Cmd) bool { + // last_active is first saved on acceptCbk + return self.last_active == null; + } + + // allocator of the current session inline fn alloc(self: *Cmd) std.mem.Allocator { - // TODO: should we return the allocator from the page instead? return self.browser.currentSession().alloc; } + // JS env of the current session inline fn env(self: Cmd) public.Env { return self.browser.currentSession().env; } @@ -114,14 +197,19 @@ pub const Cmd = struct { // close cmd if (std.mem.eql(u8, cmd, "close")) { - self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.completion, self.socket); + // close connection + std.log.debug("close cmd, closing...", .{}); + self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.read_completion, self.socket); return error.Closed; } - // cdp cmd + if (self.sessionNew) self.sessionNew = false; + + // cdp end cmd const res = cdp.do(self.alloc(), cmd, self) catch |err| { - // cdp end if (err == error.DisposeBrowserContext) { + // restart a new browser session + std.log.debug("cdp end cmd", .{}); try self.newSession(); return; } @@ -136,24 +224,15 @@ pub const Cmd = struct { } fn newSession(self: *Cmd) !void { - std.log.info("new session", .{}); try self.browser.newSession(self.alloc(), self.loop); const cmd_opaque = @as(*anyopaque, @ptrCast(self)); try self.browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif); + self.sessionNew = true; + std.log.debug("new session", .{}); } - fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void { - _ = result catch |err| { - self.err = err; - return; - }; - std.log.debug("conn closed", .{}); - - // continue accepting incoming requests - self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, completion, self.acceptCtx.socket); - } - - // Inspector + // inspector + // --------- pub fn sendInspector(self: *Cmd, msg: []const u8) void { if (self.env().getInspector()) |inspector| { @@ -271,13 +350,22 @@ const Accept = struct { socket: std.posix.socket_t, fn cbk(self: *Accept, completion: *Completion, result: AcceptError!std.posix.socket_t) void { + std.debug.assert(completion == self.cmd.read_completion); + self.cmd.socket = result catch |err| { self.cmd.err = err; return; }; + // set connection timestamp and timeout + self.cmd.last_active = std.time.Instant.now() catch |err| { + std.log.err("accept timestamp error: {any}", .{err}); + return; + }; + self.cmd.loop.io.timeout(*Cmd, self.cmd, Cmd.timeoutCbk, self.cmd.timeout_completion, TimeoutCheck); + // receving incomming messages asynchronously - self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.cbk, completion, self.cmd.socket, self.cmd.buf); + self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.readCbk, self.cmd.read_completion, self.cmd.socket, self.cmd.buf); } }; @@ -290,17 +378,22 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t) var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB defer msg_buf.deinit(loop.alloc); + // create I/O completions + var completion: Completion = undefined; + var timeout_completion: Completion = undefined; + // create I/O contexts and callbacks // for accepting connections and receving messages var ctxInput: [BufReadSize]u8 = undefined; - var completion: Completion = undefined; var cmd = Cmd{ .loop = loop, .browser = browser, + .sessionNew = true, .socket = undefined, .buf = &ctxInput, .msg_buf = &msg_buf, - .completion = &completion, + .read_completion = &completion, + .timeout_completion = &timeout_completion, }; const cmd_opaque = @as(*anyopaque, @ptrCast(&cmd)); try browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif); @@ -312,6 +405,7 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t) cmd.acceptCtx = &accept; // accepting connection asynchronously on internal server + std.log.debug("accepting new conn...", .{}); loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket); // infinite loop on I/O events, either: