server: timeout mechanism

Signed-off-by: Francis Bouvier <francis@lightpanda.io>
This commit is contained in:
Francis Bouvier
2024-10-08 23:40:50 +02:00
parent 49adb61146
commit c35c09db60

View File

@@ -34,6 +34,9 @@ const NoError = error{NoError};
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError;
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
const TimeoutCheck = std.time.ns_per_ms * 100;
const TimeoutRead = std.time.ns_per_s * 3;
// I/O Recv
// --------
@@ -48,19 +51,28 @@ pub const Cmd = struct {
buf: []u8, // only for read operations
err: ?Error = null,
completion: *Completion,
acceptCtx: *Accept = undefined,
msg_buf: *MsgBuffer,
// I/O fields
read_completion: *Completion,
timeout_completion: *Completion,
acceptCtx: *Accept = undefined,
last_active: ?std.time.Instant = null,
// CDP
state: cdp.State = .{},
// JS fields
browser: *Browser, // TODO: is pointer mandatory here?
sessionNew: bool,
// try_catch: public.TryCatch, // TODO
fn cbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void {
// callbacks
// ---------
fn readCbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void {
std.debug.assert(completion == self.read_completion);
const size = result catch |err| {
self.err = err;
return;
@@ -68,7 +80,7 @@ pub const Cmd = struct {
if (size == 0) {
// continue receving incomming messages asynchronously
self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf);
return;
}
@@ -79,12 +91,6 @@ pub const Cmd = struct {
std.debug.print("\ninput size: {d}, content: {s}\n", .{ size, content });
}
// close on exit command
if (std.mem.eql(u8, input, "exit")) {
self.err = error.NoError;
return;
}
// read and execute input
self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch |err| {
if (err != error.Closed) {
@@ -93,16 +99,93 @@ pub const Cmd = struct {
return;
};
// set connection timestamp
self.last_active = std.time.Instant.now() catch |err| {
std.log.err("read timestamp error: {any}", .{err});
return;
};
// continue receving incomming messages asynchronously
self.loop.io.recv(*Cmd, self, cbk, completion, self.socket, self.buf);
self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf);
}
fn timeoutCbk(self: *Cmd, completion: *Completion, result: TimeoutError!void) void {
std.debug.assert(completion == self.timeout_completion);
_ = result catch |err| {
self.err = err;
return;
};
if (self.isClosed()) {
// conn is already closed, ignore timeout
return;
}
// check time since last read
const now = std.time.Instant.now() catch |err| {
std.log.err("timeout timestamp error: {any}", .{err});
return;
};
if (now.since(self.last_active.?) > TimeoutRead) {
// closing
std.log.debug("conn timeout, closing...", .{});
// NOTE: we should cancel the current read
// but it seems that's just closing the connection is enough
// (and cancel does not work on MacOS)
// close current connection
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.timeout_completion, self.socket);
return;
}
// continue checking timeout
self.loop.io.timeout(*Cmd, self, Cmd.timeoutCbk, self.timeout_completion, TimeoutCheck);
}
fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void {
_ = completion;
// NOTE: completion can be either self.completion or self.timeout_completion
_ = result catch |err| {
self.err = err;
return;
};
// conn is closed
self.last_active = null;
// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
std.log.err("new session error: {any}", .{err});
return;
};
}
std.log.debug("conn closed", .{});
std.log.debug("accepting new conn...", .{});
// continue accepting incoming requests
self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, self.read_completion, self.acceptCtx.socket);
}
// shortcuts
// ---------
inline fn isClosed(self: *Cmd) bool {
// last_active is first saved on acceptCbk
return self.last_active == null;
}
// allocator of the current session
inline fn alloc(self: *Cmd) std.mem.Allocator {
// TODO: should we return the allocator from the page instead?
return self.browser.currentSession().alloc;
}
// JS env of the current session
inline fn env(self: Cmd) public.Env {
return self.browser.currentSession().env;
}
@@ -114,14 +197,19 @@ pub const Cmd = struct {
// close cmd
if (std.mem.eql(u8, cmd, "close")) {
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.completion, self.socket);
// close connection
std.log.debug("close cmd, closing...", .{});
self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.read_completion, self.socket);
return error.Closed;
}
// cdp cmd
if (self.sessionNew) self.sessionNew = false;
// cdp end cmd
const res = cdp.do(self.alloc(), cmd, self) catch |err| {
// cdp end
if (err == error.DisposeBrowserContext) {
// restart a new browser session
std.log.debug("cdp end cmd", .{});
try self.newSession();
return;
}
@@ -136,24 +224,15 @@ pub const Cmd = struct {
}
fn newSession(self: *Cmd) !void {
std.log.info("new session", .{});
try self.browser.newSession(self.alloc(), self.loop);
const cmd_opaque = @as(*anyopaque, @ptrCast(self));
try self.browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif);
self.sessionNew = true;
std.log.debug("new session", .{});
}
fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void {
_ = result catch |err| {
self.err = err;
return;
};
std.log.debug("conn closed", .{});
// continue accepting incoming requests
self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, completion, self.acceptCtx.socket);
}
// Inspector
// inspector
// ---------
pub fn sendInspector(self: *Cmd, msg: []const u8) void {
if (self.env().getInspector()) |inspector| {
@@ -271,13 +350,22 @@ const Accept = struct {
socket: std.posix.socket_t,
fn cbk(self: *Accept, completion: *Completion, result: AcceptError!std.posix.socket_t) void {
std.debug.assert(completion == self.cmd.read_completion);
self.cmd.socket = result catch |err| {
self.cmd.err = err;
return;
};
// set connection timestamp and timeout
self.cmd.last_active = std.time.Instant.now() catch |err| {
std.log.err("accept timestamp error: {any}", .{err});
return;
};
self.cmd.loop.io.timeout(*Cmd, self.cmd, Cmd.timeoutCbk, self.cmd.timeout_completion, TimeoutCheck);
// receving incomming messages asynchronously
self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.cbk, completion, self.cmd.socket, self.cmd.buf);
self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.readCbk, self.cmd.read_completion, self.cmd.socket, self.cmd.buf);
}
};
@@ -290,17 +378,22 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t)
var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB
defer msg_buf.deinit(loop.alloc);
// create I/O completions
var completion: Completion = undefined;
var timeout_completion: Completion = undefined;
// create I/O contexts and callbacks
// for accepting connections and receving messages
var ctxInput: [BufReadSize]u8 = undefined;
var completion: Completion = undefined;
var cmd = Cmd{
.loop = loop,
.browser = browser,
.sessionNew = true,
.socket = undefined,
.buf = &ctxInput,
.msg_buf = &msg_buf,
.completion = &completion,
.read_completion = &completion,
.timeout_completion = &timeout_completion,
};
const cmd_opaque = @as(*anyopaque, @ptrCast(&cmd));
try browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif);
@@ -312,6 +405,7 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t)
cmd.acceptCtx = &accept;
// accepting connection asynchronously on internal server
std.log.debug("accepting new conn...", .{});
loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket);
// infinite loop on I/O events, either: