From c8a91d4cf6beb152724e9e831d7c17e11b944103 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 9 Oct 2024 00:01:49 +0200 Subject: [PATCH] server: merge Cmd and Accept in Ctx Signed-off-by: Francis Bouvier --- src/cdp/browser.zig | 2 +- src/cdp/cdp.zig | 2 +- src/cdp/emulation.zig | 2 +- src/cdp/fetch.zig | 2 +- src/cdp/log.zig | 2 +- src/cdp/network.zig | 2 +- src/cdp/page.zig | 2 +- src/cdp/performance.zig | 2 +- src/cdp/runtime.zig | 2 +- src/cdp/target.zig | 2 +- src/server.zig | 160 ++++++++++++++++++---------------------- 11 files changed, 83 insertions(+), 97 deletions(-) diff --git a/src/cdp/browser.zig b/src/cdp/browser.zig index 7a970f8b..60da80e1 100644 --- a/src/cdp/browser.zig +++ b/src/cdp/browser.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 245659cc..94fd8e9d 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const browser = @import("browser.zig").browser; const target = @import("target.zig").target; diff --git a/src/cdp/emulation.zig b/src/cdp/emulation.zig index 3f8f7b57..7d3e1191 100644 --- a/src/cdp/emulation.zig +++ b/src/cdp/emulation.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/fetch.zig b/src/cdp/fetch.zig index c3c390d9..c9def093 100644 --- a/src/cdp/fetch.zig +++ b/src/cdp/fetch.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/log.zig b/src/cdp/log.zig index ebecea7a..26758e0c 100644 --- a/src/cdp/log.zig +++ b/src/cdp/log.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/network.zig b/src/cdp/network.zig index 49a6a7d9..9e366167 100644 --- a/src/cdp/network.zig +++ b/src/cdp/network.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/page.zig b/src/cdp/page.zig index 57591088..7bf3eceb 100644 --- a/src/cdp/page.zig +++ b/src/cdp/page.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/performance.zig b/src/cdp/performance.zig index 51395ba7..5c761681 100644 --- a/src/cdp/performance.zig +++ b/src/cdp/performance.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/runtime.zig b/src/cdp/runtime.zig index a0c6b90f..d0fa1dd0 100644 --- a/src/cdp/runtime.zig +++ b/src/cdp/runtime.zig @@ -22,7 +22,7 @@ const builtin = @import("builtin"); const jsruntime = @import("jsruntime"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/cdp/target.zig b/src/cdp/target.zig index 5774f570..84b3b1a6 100644 --- a/src/cdp/target.zig +++ b/src/cdp/target.zig @@ -19,7 +19,7 @@ const std = @import("std"); const server = @import("../server.zig"); -const Ctx = server.Cmd; +const Ctx = server.Ctx; const cdp = @import("cdp.zig"); const result = cdp.result; const getMsg = cdp.getMsg; diff --git a/src/server.zig b/src/server.zig index 25ceddab..0950b60c 100644 --- a/src/server.zig +++ b/src/server.zig @@ -35,28 +35,28 @@ const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutEr const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError; const TimeoutCheck = std.time.ns_per_ms * 100; -const TimeoutRead = std.time.ns_per_s * 3; +const TimeoutRead = std.time.ns_per_s * 3; // TODO: cli option -// I/O Recv +// I/O Main // -------- const BufReadSize = 1024; // 1KB const MaxStdOutSize = 512; // ensure debug msg are not too long -pub const Cmd = struct { +pub const Ctx = struct { loop: *public.Loop, // internal fields - socket: std.posix.socket_t, + accept_socket: std.posix.socket_t, + conn_socket: std.posix.socket_t = undefined, buf: []u8, // only for read operations err: ?Error = null, msg_buf: *MsgBuffer, // I/O fields - read_completion: *Completion, + conn_completion: *Completion, timeout_completion: *Completion, - acceptCtx: *Accept = undefined, last_active: ?std.time.Instant = null, // CDP @@ -70,8 +70,27 @@ pub const Cmd = struct { // callbacks // --------- - fn readCbk(self: *Cmd, completion: *Completion, result: RecvError!usize) void { - std.debug.assert(completion == self.read_completion); + fn acceptCbk(self: *Ctx, completion: *Completion, result: AcceptError!std.posix.socket_t) void { + std.debug.assert(completion == self.conn_completion); + + self.conn_socket = result catch |err| { + self.err = err; + return; + }; + + // set connection timestamp and timeout + self.last_active = std.time.Instant.now() catch |err| { + std.log.err("accept timestamp error: {any}", .{err}); + return; + }; + self.loop.io.timeout(*Ctx, self, Ctx.timeoutCbk, self.timeout_completion, TimeoutCheck); + + // receving incomming messages asynchronously + self.loop.io.recv(*Ctx, self, Ctx.readCbk, self.conn_completion, self.conn_socket, self.buf); + } + + fn readCbk(self: *Ctx, completion: *Completion, result: RecvError!usize) void { + std.debug.assert(completion == self.conn_completion); const size = result catch |err| { self.err = err; @@ -80,7 +99,7 @@ pub const Cmd = struct { if (size == 0) { // continue receving incomming messages asynchronously - self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf); + self.loop.io.recv(*Ctx, self, Ctx.readCbk, self.conn_completion, self.conn_socket, self.buf); return; } @@ -92,7 +111,7 @@ pub const Cmd = struct { } // read and execute input - self.msg_buf.read(self.alloc(), input, self, Cmd.do) catch |err| { + self.msg_buf.read(self.alloc(), input, self, Ctx.do) catch |err| { if (err != error.Closed) { std.log.err("do error: {any}", .{err}); } @@ -106,10 +125,10 @@ pub const Cmd = struct { }; // continue receving incomming messages asynchronously - self.loop.io.recv(*Cmd, self, Cmd.readCbk, self.read_completion, self.socket, self.buf); + self.loop.io.recv(*Ctx, self, Ctx.readCbk, self.conn_completion, self.conn_socket, self.buf); } - fn timeoutCbk(self: *Cmd, completion: *Completion, result: TimeoutError!void) void { + fn timeoutCbk(self: *Ctx, completion: *Completion, result: TimeoutError!void) void { std.debug.assert(completion == self.timeout_completion); _ = result catch |err| { @@ -137,17 +156,17 @@ pub const Cmd = struct { // (and cancel does not work on MacOS) // close current connection - self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.timeout_completion, self.socket); + self.loop.io.close(*Ctx, self, Ctx.closeCbk, self.timeout_completion, self.conn_socket); return; } // continue checking timeout - self.loop.io.timeout(*Cmd, self, Cmd.timeoutCbk, self.timeout_completion, TimeoutCheck); + self.loop.io.timeout(*Ctx, self, Ctx.timeoutCbk, self.timeout_completion, TimeoutCheck); } - fn closeCbk(self: *Cmd, completion: *Completion, result: CloseError!void) void { + fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void { _ = completion; - // NOTE: completion can be either self.completion or self.timeout_completion + // NOTE: completion can be either self.conn_completion or self.timeout_completion _ = result catch |err| { self.err = err; @@ -169,37 +188,37 @@ pub const Cmd = struct { std.log.debug("accepting new conn...", .{}); // continue accepting incoming requests - self.loop.io.accept(*Accept, self.acceptCtx, Accept.cbk, self.read_completion, self.acceptCtx.socket); + self.loop.io.accept(*Ctx, self, Ctx.acceptCbk, self.conn_completion, self.accept_socket); } // shortcuts // --------- - inline fn isClosed(self: *Cmd) bool { + inline fn isClosed(self: *Ctx) bool { // last_active is first saved on acceptCbk return self.last_active == null; } // allocator of the current session - inline fn alloc(self: *Cmd) std.mem.Allocator { + inline fn alloc(self: *Ctx) std.mem.Allocator { return self.browser.currentSession().alloc; } // JS env of the current session - inline fn env(self: Cmd) public.Env { + inline fn env(self: Ctx) public.Env { return self.browser.currentSession().env; } // actions // ------- - fn do(self: *Cmd, cmd: []const u8) anyerror!void { + fn do(self: *Ctx, cmd: []const u8) anyerror!void { // close cmd if (std.mem.eql(u8, cmd, "close")) { // close connection std.log.debug("close cmd, closing...", .{}); - self.loop.io.close(*Cmd, self, Cmd.closeCbk, self.read_completion, self.socket); + self.loop.io.close(*Ctx, self, Ctx.closeCbk, self.conn_completion, self.conn_socket); return error.Closed; } @@ -223,10 +242,10 @@ pub const Cmd = struct { } } - fn newSession(self: *Cmd) !void { + fn newSession(self: *Ctx) !void { try self.browser.newSession(self.alloc(), self.loop); - const cmd_opaque = @as(*anyopaque, @ptrCast(self)); - try self.browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif); + const ctx_opaque = @as(*anyopaque, @ptrCast(self)); + try self.browser.currentSession().setInspector(ctx_opaque, Ctx.onInspectorResp, Ctx.onInspectorNotif); self.sessionNew = true; std.log.debug("new session", .{}); } @@ -234,7 +253,7 @@ pub const Cmd = struct { // inspector // --------- - pub fn sendInspector(self: *Cmd, msg: []const u8) void { + pub fn sendInspector(self: *Ctx, msg: []const u8) void { if (self.env().getInspector()) |inspector| { inspector.send(self.env(), msg); } @@ -242,8 +261,8 @@ pub const Cmd = struct { pub fn onInspectorResp(cmd_opaque: *anyopaque, _: u32, msg: []const u8) void { std.log.debug("onResp biz fn called: {s}", .{msg}); - const aligned = @as(*align(@alignOf(Cmd)) anyopaque, @alignCast(cmd_opaque)); - const self = @as(*Cmd, @ptrCast(aligned)); + const aligned = @as(*align(@alignOf(Ctx)) anyopaque, @alignCast(cmd_opaque)); + const self = @as(*Ctx, @ptrCast(aligned)); const tpl = "{s},\"sessionId\":\"{s}\"}}"; const msg_open = msg[0 .. msg.len - 1]; // remove closing bracket @@ -259,8 +278,8 @@ pub const Cmd = struct { pub fn onInspectorNotif(cmd_opaque: *anyopaque, msg: []const u8) void { std.log.debug("onNotif biz fn called: {s}", .{msg}); - const aligned = @as(*align(@alignOf(Cmd)) anyopaque, @alignCast(cmd_opaque)); - const self = @as(*Cmd, @ptrCast(aligned)); + const aligned = @as(*align(@alignOf(Ctx)) anyopaque, @alignCast(cmd_opaque)); + const self = @as(*Ctx, @ptrCast(aligned)); const tpl = "{s},\"sessionId\":\"{s}\"}}"; const msg_open = msg[0 .. msg.len - 1]; // remove closing bracket @@ -280,10 +299,10 @@ pub const Cmd = struct { // -------- const Send = struct { - cmd: *Cmd, + ctx: *Ctx, buf: []const u8, - fn init(ctx: *Cmd, msg: []const u8) !struct { + fn init(ctx: *Ctx, msg: []const u8) !struct { ctx: *Send, completion: *Completion, } { @@ -291,34 +310,34 @@ const Send = struct { // recv and timeout operations, that's why we create a new completion here const completion = try ctx.alloc().create(Completion); // NOTE: to handle concurrent calls we create each time a new context - // If no concurrent calls where required we could just use the main CmdCtx + // If no concurrent calls where required we could just use the main Ctx const sd = try ctx.alloc().create(Send); sd.* = .{ - .cmd = ctx, + .ctx = ctx, .buf = msg, }; return .{ .ctx = sd, .completion = completion }; } fn deinit(self: *Send, completion: *Completion) void { - self.cmd.alloc().destroy(completion); - self.cmd.alloc().free(self.buf); - self.cmd.alloc().destroy(self); + self.ctx.alloc().destroy(completion); + self.ctx.alloc().free(self.buf); + self.ctx.alloc().destroy(self); } fn laterCbk(self: *Send, completion: *Completion, result: TimeoutError!void) void { std.log.debug("sending after", .{}); _ = result catch |err| { - self.cmd.err = err; + self.ctx.err = err; return; }; - self.cmd.loop.io.send(*Send, self, Send.asyncCbk, completion, self.cmd.socket, self.buf); + self.ctx.loop.io.send(*Send, self, Send.asyncCbk, completion, self.ctx.socket, self.buf); } fn asyncCbk(self: *Send, completion: *Completion, result: SendError!usize) void { const size = result catch |err| { - self.cmd.err = err; + self.ctx.err = err; return; }; @@ -327,86 +346,53 @@ const Send = struct { } }; -pub fn sendLater(ctx: *Cmd, msg: []const u8, ns: u63) !void { +pub fn sendLater(ctx: *Ctx, msg: []const u8, ns: u63) !void { const sd = try Send.init(ctx, msg); ctx.loop.io.timeout(*Send, sd.ctx, Send.laterCbk, sd.completion, ns); } -pub fn sendAsync(ctx: *Cmd, msg: []const u8) !void { +pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void { const sd = try Send.init(ctx, msg); - ctx.loop.io.send(*Send, sd.ctx, Send.asyncCbk, sd.completion, ctx.socket, msg); + ctx.loop.io.send(*Send, sd.ctx, Send.asyncCbk, sd.completion, ctx.conn_socket, msg); } -pub fn sendSync(ctx: *Cmd, msg: []const u8) !void { - const s = try std.posix.write(ctx.socket, msg); +pub fn sendSync(ctx: *Ctx, msg: []const u8) !void { + const s = try std.posix.write(ctx.conn_socket, msg); std.log.debug("send sync {d} bytes", .{s}); } -// I/O Accept -// ---------- - -const Accept = struct { - cmd: *Cmd, - socket: std.posix.socket_t, - - fn cbk(self: *Accept, completion: *Completion, result: AcceptError!std.posix.socket_t) void { - std.debug.assert(completion == self.cmd.read_completion); - - self.cmd.socket = result catch |err| { - self.cmd.err = err; - return; - }; - - // set connection timestamp and timeout - self.cmd.last_active = std.time.Instant.now() catch |err| { - std.log.err("accept timestamp error: {any}", .{err}); - return; - }; - self.cmd.loop.io.timeout(*Cmd, self.cmd, Cmd.timeoutCbk, self.cmd.timeout_completion, TimeoutCheck); - - // receving incomming messages asynchronously - self.cmd.loop.io.recv(*Cmd, self.cmd, Cmd.readCbk, self.cmd.read_completion, self.cmd.socket, self.cmd.buf); - } -}; - // Listen // ------ -pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t) anyerror!void { +pub fn listen(browser: *Browser, loop: *public.Loop, server_socket: std.posix.socket_t) anyerror!void { // MsgBuffer var msg_buf = try MsgBuffer.init(loop.alloc, BufReadSize * 256); // 256KB defer msg_buf.deinit(loop.alloc); // create I/O completions - var completion: Completion = undefined; + var conn_completion: Completion = undefined; var timeout_completion: Completion = undefined; // create I/O contexts and callbacks // for accepting connections and receving messages var ctxInput: [BufReadSize]u8 = undefined; - var cmd = Cmd{ + var ctx = Ctx{ .loop = loop, .browser = browser, .sessionNew = true, - .socket = undefined, .buf = &ctxInput, .msg_buf = &msg_buf, - .read_completion = &completion, + .accept_socket = server_socket, + .conn_completion = &conn_completion, .timeout_completion = &timeout_completion, }; - const cmd_opaque = @as(*anyopaque, @ptrCast(&cmd)); - try browser.currentSession().setInspector(cmd_opaque, Cmd.onInspectorResp, Cmd.onInspectorNotif); - - var accept = Accept{ - .cmd = &cmd, - .socket = socket, - }; - cmd.acceptCtx = &accept; + const ctx_opaque = @as(*anyopaque, @ptrCast(ctx)); + try browser.currentSession().setInspector(ctx_opaque, Ctx.onInspectorResp, Ctx.onInspectorNotif); // accepting connection asynchronously on internal server std.log.debug("accepting new conn...", .{}); - loop.io.accept(*Accept, &accept, Accept.cbk, &completion, socket); + loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket); // infinite loop on I/O events, either: // - cmd from incoming connection on server socket @@ -421,7 +407,7 @@ pub fn listen(browser: *Browser, loop: *public.Loop, socket: std.posix.socket_t) // } // loop.cbk_error = false; } - if (cmd.err) |err| { + if (ctx.err) |err| { if (err != error.NoError) std.log.err("Server error: {any}", .{err}); break; }