From 325ecedf0b7f55e65e686ada12fc5fe5fe453e39 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Tue, 26 Nov 2024 12:55:48 +0100 Subject: [PATCH 1/6] websocket: first implementation Signed-off-by: Francis Bouvier --- .gitmodules | 4 ++ build.zig | 5 ++ src/cdp/cdp.zig | 2 +- src/cdp/page.zig | 2 +- src/cdp/target.zig | 4 +- src/handler.zig | 83 ++++++++++++++++++++++++ src/main.zig | 150 ++++++++++++------------------------------- src/msg.zig | 5 +- src/server.zig | 60 +++++++++++++++-- vendor/websocket.zig | 1 + 10 files changed, 196 insertions(+), 120 deletions(-) create mode 100644 src/handler.zig create mode 160000 vendor/websocket.zig diff --git a/.gitmodules b/.gitmodules index 229d1a16..48c26324 100644 --- a/.gitmodules +++ b/.gitmodules @@ -28,3 +28,7 @@ [submodule "vendor/zig-async-io"] path = vendor/zig-async-io url = git@github.com:lightpanda-io/zig-async-io.git +[submodule "vendor/websocket.zig"] + path = vendor/websocket.zig + url = git@github.com:lightpanda-io/websocket.zig.git + branch = lightpanda diff --git a/build.zig b/build.zig index 2c86f3ff..7bb94e4c 100644 --- a/build.zig +++ b/build.zig @@ -168,6 +168,11 @@ fn common( .root_source_file = b.path("vendor/tls.zig/src/main.zig"), }); step.root_module.addImport("tls", tlsmod); + + const wsmod = b.addModule("ws", .{ + .root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"), + }); + step.root_module.addImport("websocket", wsmod); } fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module { diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 14c7e3b0..387434aa 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -193,7 +193,7 @@ pub fn sendEvent( const resp = Resp{ .method = name, .params = params, .sessionId = sessionID }; const event_msg = try stringify(alloc, resp); - try server.sendAsync(ctx, event_msg); + try ctx.send(event_msg); } // Common diff --git a/src/cdp/page.zig b/src/cdp/page.zig index b2beb203..cf86297f 100644 --- a/src/cdp/page.zig +++ b/src/cdp/page.zig @@ -323,7 +323,7 @@ fn navigate( .loaderId = ctx.state.loaderID, }; const res = try result(alloc, input.id, Resp, resp, input.sessionId); - try server.sendAsync(ctx, res); + try ctx.send(res); // TODO: at this point do we need async the following actions to be async? diff --git a/src/cdp/target.zig b/src/cdp/target.zig index f65c4ac3..d038e2e8 100644 --- a/src/cdp/target.zig +++ b/src/cdp/target.zig @@ -292,7 +292,7 @@ fn disposeBrowserContext( // output const res = try result(alloc, input.id, null, .{}, null); - try server.sendAsync(ctx, res); + try ctx.send(res); return error.DisposeBrowserContext; } @@ -378,7 +378,7 @@ fn closeTarget( success: bool = true, }; const res = try result(alloc, input.id, Resp, Resp{}, null); - try server.sendAsync(ctx, res); + try ctx.send(res); // Inspector.detached event const InspectorDetached = struct { diff --git a/src/handler.zig b/src/handler.zig new file mode 100644 index 00000000..4e18ba14 --- /dev/null +++ b/src/handler.zig @@ -0,0 +1,83 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); + +const ws = @import("websocket"); + +const log = std.log.scoped(.handler); + +pub const Stream = struct { + socket: std.posix.socket_t = undefined, + ws_conn: *ws.Conn = undefined, + + fn connectCDP(self: *Stream) !void { + const address = try std.net.Address.parseIp("127.0.0.1", 3245); + + const flags: u32 = std.posix.SOCK.STREAM; + const proto = std.posix.IPPROTO.TCP; + const socket = try std.posix.socket(address.any.family, flags, proto); + + try std.posix.connect( + socket, + &address.any, + address.getOsSockLen(), + ); + log.debug("connected to Stream server", .{}); + self.socket = socket; + } + + fn closeCDP(self: *const Stream) void { + std.posix.close(self.socket); + } + + fn start(self: *Stream, ws_conn: *ws.Conn) !void { + try self.connectCDP(); + self.ws_conn = ws_conn; + } + + pub fn recv(self: *const Stream, data: []const u8) !void { + var pos: usize = 0; + while (pos < data.len) { + const len = try std.posix.write(self.socket, data[pos..]); + pos += len; + } + } + + pub fn send(self: *const Stream, data: []const u8) !void { + return self.ws_conn.write(data); + } +}; + +pub const Handler = struct { + stream: *Stream, + + pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler { + try stream.start(ws_conn); + return .{ .stream = stream }; + } + + pub fn close(self: *Handler) void { + self.stream.closeCDP(); + } + + pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void { + const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data }); + try self.stream.recv(msg); + } +}; diff --git a/src/main.zig b/src/main.zig index c2106f73..6f40acd3 100644 --- a/src/main.zig +++ b/src/main.zig @@ -17,13 +17,14 @@ // along with this program. If not, see . const std = @import("std"); -const posix = std.posix; const builtin = @import("builtin"); const jsruntime = @import("jsruntime"); +const websocket = @import("websocket"); const Browser = @import("browser/browser.zig").Browser; const server = @import("server.zig"); +const handler = @import("handler.zig"); const parser = @import("netsurf"); const apiweb = @import("apiweb.zig"); @@ -32,103 +33,12 @@ pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const UserContext = apiweb.UserContext; pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); +// Simple blocking websocket connection model +// ie. 1 thread per ws connection without thread pool and epoll/kqueue +pub const websocket_blocking = true; + const log = std.log.scoped(.cli); -// Inspired by std.net.StreamServer in Zig < 0.12 -pub const StreamServer = struct { - /// Copied from `Options` on `init`. - kernel_backlog: u31, - reuse_address: bool, - reuse_port: bool, - nonblocking: bool, - - /// `undefined` until `listen` returns successfully. - listen_address: std.net.Address, - - sockfd: ?posix.socket_t, - - pub const Options = struct { - /// How many connections the kernel will accept on the application's behalf. - /// If more than this many connections pool in the kernel, clients will start - /// seeing "Connection refused". - kernel_backlog: u31 = 128, - - /// Enable SO.REUSEADDR on the socket. - reuse_address: bool = false, - - /// Enable SO.REUSEPORT on the socket. - reuse_port: bool = false, - - /// Non-blocking mode. - nonblocking: bool = false, - }; - - /// After this call succeeds, resources have been acquired and must - /// be released with `deinit`. - pub fn init(options: Options) StreamServer { - return StreamServer{ - .sockfd = null, - .kernel_backlog = options.kernel_backlog, - .reuse_address = options.reuse_address, - .reuse_port = options.reuse_port, - .nonblocking = options.nonblocking, - .listen_address = undefined, - }; - } - - /// Release all resources. The `StreamServer` memory becomes `undefined`. - pub fn deinit(self: *StreamServer) void { - self.close(); - self.* = undefined; - } - - fn setSockOpt(fd: posix.socket_t, level: i32, option: u32, value: c_int) !void { - try posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); - } - - pub fn listen(self: *StreamServer, address: std.net.Address) !void { - const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; - var use_sock_flags: u32 = sock_flags; - if (self.nonblocking) use_sock_flags |= posix.SOCK.NONBLOCK; - const proto = if (address.any.family == posix.AF.UNIX) @as(u32, 0) else posix.IPPROTO.TCP; - - const sockfd = try posix.socket(address.any.family, use_sock_flags, proto); - self.sockfd = sockfd; - errdefer { - posix.close(sockfd); - self.sockfd = null; - } - - // socket options - if (self.reuse_address) { - try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1); - } - if (@hasDecl(posix.SO, "REUSEPORT") and self.reuse_port) { - try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1); - } - if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS - // WARNING: disable Nagle's alogrithm to avoid latency issues - try setSockOpt(sockfd, posix.IPPROTO.TCP, posix.TCP.NODELAY, 1); - } - - var socklen = address.getOsSockLen(); - try posix.bind(sockfd, &address.any, socklen); - try posix.listen(sockfd, self.kernel_backlog); - try posix.getsockname(sockfd, &self.listen_address.any, &socklen); - } - - /// Stop listening. It is still necessary to call `deinit` after stopping listening. - /// Calling `deinit` will automatically call `close`. It is safe to call `close` when - /// not listening. - pub fn close(self: *StreamServer) void { - if (self.sockfd) |fd| { - posix.close(fd); - self.sockfd = null; - self.listen_address = undefined; - } - } -}; - const usage = \\usage: {s} [options] [URL] \\ @@ -319,27 +229,49 @@ pub fn main() !void { switch (cli_mode) { .server => |mode| { - // server - var srv = StreamServer.init(.{ - .reuse_address = true, - .reuse_port = true, - .nonblocking = true, - }); - defer srv.deinit(); - - srv.listen(mode.addr) catch |err| { + // Stream server + const socket = server.listen(mode.addr) catch |err| { log.err("address (host:port) {any}\n", .{err}); return printUsageExit(mode.execname, 1); }; - defer srv.close(); - log.info("Server mode: listening on {s}:{d}...", .{ mode.host, mode.port }); + defer std.posix.close(socket); + log.debug("Server mode: listening internally on {s}:{d}...", .{ mode.host, mode.port }); + + var stream = handler.Stream{}; // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); - // listen - try server.listen(alloc, &loop, srv.sockfd.?, std.time.ns_per_s * @as(u64, mode.timeout)); + // start stream server in separate thread + const cdp_thread = try std.Thread.spawn( + .{ .allocator = alloc }, + server.handle, + .{ + alloc, + &loop, + socket, + &stream, + std.time.ns_per_s * @as(u64, mode.timeout), + }, + ); + + // Websocket server + var ws = try websocket.Server(handler.Handler).init(alloc, .{ + .port = 9222, + .address = "127.0.0.1", + .handshake = .{ + .timeout = 3, + .max_size = 1024, + // since we aren't using hanshake.headers + // we can set this to 0 to save a few bytes. + .max_headers = 0, + }, + }); + defer ws.deinit(); + + try ws.listen(&stream); + cdp_thread.join(); }, .fetch => |mode| { diff --git a/src/msg.zig b/src/msg.zig index b3d4da5a..64a81bd9 100644 --- a/src/msg.zig +++ b/src/msg.zig @@ -102,7 +102,10 @@ pub const MsgBuffer = struct { } // copy the current input into MsgBuffer - @memcpy(self.buf[self.pos..new_pos], _input[0..]); + // NOTE: we could use @memcpy but it's not Thread-safe (alias problem) + // see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/ + // Intead we just use std.mem.copyForwards + std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]); // set the new cursor position self.pos = new_pos; diff --git a/src/server.zig b/src/server.zig index 9b34c602..b0e33dd9 100644 --- a/src/server.zig +++ b/src/server.zig @@ -19,6 +19,8 @@ const std = @import("std"); const builtin = @import("builtin"); +const Stream = @import("handler.zig").Stream; + const jsruntime = @import("jsruntime"); const Completion = jsruntime.IO.Completion; const AcceptError = jsruntime.IO.AcceptError; @@ -49,6 +51,7 @@ const MaxStdOutSize = 512; // ensure debug msg are not too long pub const Ctx = struct { loop: *jsruntime.Loop, + stream: ?*Stream, // internal fields accept_socket: std.posix.socket_t, @@ -283,7 +286,18 @@ pub const Ctx = struct { // send result if (!std.mem.eql(u8, res, "")) { - return sendAsync(self, res); + return self.send(res); + } + } + + pub fn send(self: *Ctx, msg: []const u8) !void { + if (self.stream) |stream| { + // if we have a stream connection, just write on it + defer self.alloc().free(msg); + try stream.send(msg); + } else { + // otherwise write asynchronously on the socket connection + return sendAsync(self, msg); } } @@ -362,7 +376,7 @@ pub const Ctx = struct { .{ msg_open, cdp.ContextSessionID }, ); - try sendAsync(ctx, s); + try ctx.send(s); } pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void { @@ -422,16 +436,17 @@ const Send = struct { pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void { const sd = try Send.init(ctx, msg); - ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, msg); + ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, sd.msg); } -// Listen -// ------ +// Listener and handler +// -------------------- -pub fn listen( +pub fn handle( alloc: std.mem.Allocator, loop: *jsruntime.Loop, server_socket: std.posix.socket_t, + stream: ?*Stream, timeout: u64, ) anyerror!void { @@ -458,6 +473,7 @@ pub fn listen( // for accepting connections and receving messages var ctx = Ctx{ .loop = loop, + .stream = stream, .browser = &browser, .sessionNew = true, .read_buf = &read_buf, @@ -497,3 +513,35 @@ pub fn listen( } } } + +fn setSockOpt(fd: std.posix.socket_t, level: i32, option: u32, value: c_int) !void { + try std.posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); +} + +pub fn listen(address: std.net.Address) !std.posix.socket_t { + + // create socket + const flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC | std.posix.SOCK.NONBLOCK; + const sockfd = try std.posix.socket(address.any.family, flags, std.posix.IPPROTO.TCP); + errdefer std.posix.close(sockfd); + + // socket options + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); + if (@hasDecl(std.posix.SO, "REUSEPORT")) { + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, 1); + } + if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS + // WARNING: disable Nagle's alogrithm to avoid latency issues + try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + } + + // bind & listen + var socklen = address.getOsSockLen(); + try std.posix.bind(sockfd, &address.any, socklen); + const kernel_backlog = 1; // default value is 128. Here we just want 1 connection + try std.posix.listen(sockfd, kernel_backlog); + var listen_address: std.net.Address = undefined; + try std.posix.getsockname(sockfd, &listen_address.any, &socklen); + + return sockfd; +} diff --git a/vendor/websocket.zig b/vendor/websocket.zig new file mode 160000 index 00000000..ba14f387 --- /dev/null +++ b/vendor/websocket.zig @@ -0,0 +1 @@ +Subproject commit ba14f387b22210667a2941c1e5e4170eb1854957 From 27b50c46c35f13b17196679e8053f918f65f1145 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 27 Nov 2024 16:43:05 +0100 Subject: [PATCH 2/6] Update websokets dep Signed-off-by: Francis Bouvier --- vendor/websocket.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/websocket.zig b/vendor/websocket.zig index ba14f387..215493b7 160000 --- a/vendor/websocket.zig +++ b/vendor/websocket.zig @@ -1 +1 @@ -Subproject commit ba14f387b22210667a2941c1e5e4170eb1854957 +Subproject commit 215493b71e724d8b0d1ea89b1c1791cfa45d860a From 8449d5ab224f667c19e46f63199a944e6fff9b43 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 27 Nov 2024 21:16:16 +0100 Subject: [PATCH 3/6] websocket: use Unix socket for internal server And add an option for TCP only server Signed-off-by: Francis Bouvier --- src/handler.zig | 14 ++++++++------ src/main.zig | 42 ++++++++++++++++++++++++++++-------------- src/server.zig | 18 +++++++++++++----- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/src/handler.zig b/src/handler.zig index 4e18ba14..bcfa1dd1 100644 --- a/src/handler.zig +++ b/src/handler.zig @@ -23,20 +23,22 @@ const ws = @import("websocket"); const log = std.log.scoped(.handler); pub const Stream = struct { + addr: std.net.Address, socket: std.posix.socket_t = undefined, ws_conn: *ws.Conn = undefined, fn connectCDP(self: *Stream) !void { - const address = try std.net.Address.parseIp("127.0.0.1", 3245); - const flags: u32 = std.posix.SOCK.STREAM; - const proto = std.posix.IPPROTO.TCP; - const socket = try std.posix.socket(address.any.family, flags, proto); + const proto = blk: { + if (self.addr.any.family == std.posix.AF.UNIX) break :blk @as(u32, 0); + break :blk std.posix.IPPROTO.TCP; + }; + const socket = try std.posix.socket(self.addr.any.family, flags, proto); try std.posix.connect( socket, - &address.any, - address.getOsSockLen(), + &self.addr.any, + self.addr.getOsSockLen(), ); log.debug("connected to Stream server", .{}); self.socket = socket; diff --git a/src/main.zig b/src/main.zig index 6f40acd3..61bd2f49 100644 --- a/src/main.zig +++ b/src/main.zig @@ -80,10 +80,11 @@ const CliMode = union(CliModeTag) { host: []const u8 = Host, port: u16 = Port, timeout: u8 = Timeout, + tcp: bool = false, // undocumented TCP mode // default options const Host = "127.0.0.1"; - const Port = 3245; + const Port = 9222; const Timeout = 3; // in seconds }; @@ -145,6 +146,10 @@ const CliMode = union(CliModeTag) { return printUsageExit(execname, 1); } } + if (std.mem.eql(u8, "--tcp", opt)) { + _server.tcp = true; + continue; + } // unknown option if (std.mem.startsWith(u8, opt, "--")) { @@ -230,36 +235,45 @@ pub fn main() !void { .server => |mode| { // Stream server - const socket = server.listen(mode.addr) catch |err| { - log.err("address (host:port) {any}\n", .{err}); + const addr = blk: { + if (mode.tcp) { + break :blk mode.addr; + } else { + const unix_path = "/tmp/lightpanda"; + std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists + break :blk try std.net.Address.initUnix(unix_path); + } + }; + const socket = server.listen(addr) catch |err| { + log.err("Server listen error: {any}\n", .{err}); return printUsageExit(mode.execname, 1); }; defer std.posix.close(socket); - log.debug("Server mode: listening internally on {s}:{d}...", .{ mode.host, mode.port }); + log.debug("Server mode: listening internally on {any}...", .{addr}); - var stream = handler.Stream{}; + const timeout = std.time.ns_per_s * @as(u64, mode.timeout); // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); + // TCP server mode + if (mode.tcp) { + return server.handle(alloc, &loop, socket, null, timeout); + } + // start stream server in separate thread + var stream = handler.Stream{ .addr = addr }; const cdp_thread = try std.Thread.spawn( .{ .allocator = alloc }, server.handle, - .{ - alloc, - &loop, - socket, - &stream, - std.time.ns_per_s * @as(u64, mode.timeout), - }, + .{ alloc, &loop, socket, &stream, timeout }, ); // Websocket server var ws = try websocket.Server(handler.Handler).init(alloc, .{ - .port = 9222, - .address = "127.0.0.1", + .port = mode.port, + .address = mode.host, .handshake = .{ .timeout = 3, .max_size = 1024, diff --git a/src/server.zig b/src/server.zig index b0e33dd9..0926d5f5 100644 --- a/src/server.zig +++ b/src/server.zig @@ -518,21 +518,29 @@ fn setSockOpt(fd: std.posix.socket_t, level: i32, option: u32, value: c_int) !vo try std.posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); } +fn isUnixSocket(addr: std.net.Address) bool { + return addr.any.family == std.posix.AF.UNIX; +} + pub fn listen(address: std.net.Address) !std.posix.socket_t { // create socket const flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC | std.posix.SOCK.NONBLOCK; - const sockfd = try std.posix.socket(address.any.family, flags, std.posix.IPPROTO.TCP); + const proto = if (isUnixSocket(address)) @as(u32, 0) else std.posix.IPPROTO.TCP; + const sockfd = try std.posix.socket(address.any.family, flags, proto); errdefer std.posix.close(sockfd); // socket options - try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); if (@hasDecl(std.posix.SO, "REUSEPORT")) { try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, 1); + } else { + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); } - if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS - // WARNING: disable Nagle's alogrithm to avoid latency issues - try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + if (!isUnixSocket(address)) { + if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS + // WARNING: disable Nagle's alogrithm to avoid latency issues + try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + } } // bind & listen From 760c082757893f062222381dda3d76683db52795 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 27 Nov 2024 21:23:33 +0100 Subject: [PATCH 4/6] cli: wording mode -> opts Signed-off-by: Francis Bouvier --- src/main.zig | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/main.zig b/src/main.zig index 61bd2f49..b0cb55cf 100644 --- a/src/main.zig +++ b/src/main.zig @@ -232,12 +232,12 @@ pub fn main() !void { defer cli_mode.deinit(); switch (cli_mode) { - .server => |mode| { + .server => |opts| { // Stream server const addr = blk: { - if (mode.tcp) { - break :blk mode.addr; + if (opts.tcp) { + break :blk opts.addr; } else { const unix_path = "/tmp/lightpanda"; std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists @@ -246,19 +246,19 @@ pub fn main() !void { }; const socket = server.listen(addr) catch |err| { log.err("Server listen error: {any}\n", .{err}); - return printUsageExit(mode.execname, 1); + return printUsageExit(opts.execname, 1); }; defer std.posix.close(socket); - log.debug("Server mode: listening internally on {any}...", .{addr}); + log.debug("Server opts: listening internally on {any}...", .{addr}); - const timeout = std.time.ns_per_s * @as(u64, mode.timeout); + const timeout = std.time.ns_per_s * @as(u64, opts.timeout); // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); // TCP server mode - if (mode.tcp) { + if (opts.tcp) { return server.handle(alloc, &loop, socket, null, timeout); } @@ -272,8 +272,8 @@ pub fn main() !void { // Websocket server var ws = try websocket.Server(handler.Handler).init(alloc, .{ - .port = mode.port, - .address = mode.host, + .port = opts.port, + .address = opts.host, .handshake = .{ .timeout = 3, .max_size = 1024, @@ -288,8 +288,8 @@ pub fn main() !void { cdp_thread.join(); }, - .fetch => |mode| { - log.debug("Fetch mode: url {s}, dump {any}", .{ mode.url, mode.dump }); + .fetch => |opts| { + log.debug("Fetch mode: url {s}, dump {any}", .{ opts.url, opts.dump }); // vm const vm = jsruntime.VM.init(); @@ -307,21 +307,21 @@ pub fn main() !void { // page const page = try browser.session.createPage(); - _ = page.navigate(mode.url, null) catch |err| switch (err) { + _ = page.navigate(opts.url, null) catch |err| switch (err) { error.UnsupportedUriScheme, error.UriMissingHost => { - log.err("'{s}' is not a valid URL ({any})\n", .{ mode.url, err }); - return printUsageExit(mode.execname, 1); + log.err("'{s}' is not a valid URL ({any})\n", .{ opts.url, err }); + return printUsageExit(opts.execname, 1); }, else => { - log.err("'{s}' fetching error ({any})s\n", .{ mode.url, err }); - return printUsageExit(mode.execname, 1); + log.err("'{s}' fetching error ({any})s\n", .{ opts.url, err }); + return printUsageExit(opts.execname, 1); }, }; try page.wait(); // dump - if (mode.dump) { + if (opts.dump) { try page.dump(std.io.getStdOut()); } }, From 95ac92b343e679992ee4b691dd9cd804467edf91 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Fri, 29 Nov 2024 14:59:14 +0100 Subject: [PATCH 5/6] server: fix cancel Signed-off-by: Francis Bouvier --- src/handler.zig | 4 ++++ src/server.zig | 39 ++++++--------------------------------- 2 files changed, 10 insertions(+), 33 deletions(-) diff --git a/src/handler.zig b/src/handler.zig index bcfa1dd1..004b2734 100644 --- a/src/handler.zig +++ b/src/handler.zig @@ -45,6 +45,10 @@ pub const Stream = struct { } fn closeCDP(self: *const Stream) void { + const close_msg: []const u8 = .{ 5, 0 } ++ "close"; + self.recv(close_msg) catch |err| { + log.err("stream close error: {any}", .{err}); + }; std.posix.close(self.socket); } diff --git a/src/server.zig b/src/server.zig index 0926d5f5..7d2a41a5 100644 --- a/src/server.zig +++ b/src/server.zig @@ -120,8 +120,8 @@ pub const Ctx = struct { std.debug.assert(completion == self.conn_completion); const size = result catch |err| { - if (err == error.Canceled) { - log.debug("read canceled", .{}); + if (self.isClosed() and err == error.FileDescriptorInvalid) { + log.debug("read has been canceled", .{}); return; } log.err("read error: {any}", .{err}); @@ -202,7 +202,7 @@ pub const Ctx = struct { if (now.since(self.last_active.?) > self.timeout) { // close current connection log.debug("conn timeout, closing...", .{}); - self.cancelAndClose(); + self.close(); return; } @@ -216,19 +216,6 @@ pub const Ctx = struct { ); } - fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void { - std.debug.assert(completion == self.accept_completion); - - _ = result catch |err| { - log.err("cancel error: {any}", .{err}); - self.err = err; - return; - }; - log.debug("cancel done", .{}); - - self.close(); - } - // shortcuts // --------- @@ -265,7 +252,7 @@ pub const Ctx = struct { if (std.mem.eql(u8, cmd, "close")) { // close connection log.info("close cmd, closing conn...", .{}); - self.cancelAndClose(); + self.close(); return error.Closed; } @@ -301,26 +288,12 @@ pub const Ctx = struct { } } - fn cancelAndClose(self: *Ctx) void { - if (isLinux) { // cancel is only available on Linux - self.loop.io.cancel( - *Ctx, - self, - Ctx.cancelCbk, - self.accept_completion, - self.conn_completion, - ); - } else { - self.close(); - } - } - fn close(self: *Ctx) void { - std.posix.close(self.conn_socket); // conn is closed - log.debug("connection closed", .{}); self.last_active = null; + std.posix.close(self.conn_socket); + log.debug("connection closed", .{}); // restart a new browser session in case of re-connect if (!self.sessionNew) { From d95462073a45e9f8305b9eccdefa34b70e66f08e Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Fri, 29 Nov 2024 15:04:02 +0100 Subject: [PATCH 6/6] websockets: fix port default in help Signed-off-by: Francis Bouvier --- build.zig | 2 +- src/handler.zig | 2 +- src/main.zig | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.zig b/build.zig index 7bb94e4c..b937252a 100644 --- a/build.zig +++ b/build.zig @@ -169,7 +169,7 @@ fn common( }); step.root_module.addImport("tls", tlsmod); - const wsmod = b.addModule("ws", .{ + const wsmod = b.addModule("websocket", .{ .root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"), }); step.root_module.addImport("websocket", wsmod); diff --git a/src/handler.zig b/src/handler.zig index 004b2734..4a9ba06a 100644 --- a/src/handler.zig +++ b/src/handler.zig @@ -45,7 +45,7 @@ pub const Stream = struct { } fn closeCDP(self: *const Stream) void { - const close_msg: []const u8 = .{ 5, 0 } ++ "close"; + const close_msg: []const u8 = "5:close"; self.recv(close_msg) catch |err| { log.err("stream close error: {any}", .{err}); }; diff --git a/src/main.zig b/src/main.zig index b0cb55cf..82d2374f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -49,7 +49,7 @@ const usage = \\ \\ -h, --help Print this help message and exit. \\ --host Host of the CDP server (default "127.0.0.1") - \\ --port Port of the CDP server (default "3245") + \\ --port Port of the CDP server (default "9222") \\ --timeout Timeout for incoming connections of the CDP server (in seconds, default "3") \\ --dump Dump document in stdout (fetch mode only) \\