From 325ecedf0b7f55e65e686ada12fc5fe5fe453e39 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Tue, 26 Nov 2024 12:55:48 +0100 Subject: [PATCH] websocket: first implementation Signed-off-by: Francis Bouvier --- .gitmodules | 4 ++ build.zig | 5 ++ src/cdp/cdp.zig | 2 +- src/cdp/page.zig | 2 +- src/cdp/target.zig | 4 +- src/handler.zig | 83 ++++++++++++++++++++++++ src/main.zig | 150 ++++++++++++------------------------------- src/msg.zig | 5 +- src/server.zig | 60 +++++++++++++++-- vendor/websocket.zig | 1 + 10 files changed, 196 insertions(+), 120 deletions(-) create mode 100644 src/handler.zig create mode 160000 vendor/websocket.zig diff --git a/.gitmodules b/.gitmodules index 229d1a16..48c26324 100644 --- a/.gitmodules +++ b/.gitmodules @@ -28,3 +28,7 @@ [submodule "vendor/zig-async-io"] path = vendor/zig-async-io url = git@github.com:lightpanda-io/zig-async-io.git +[submodule "vendor/websocket.zig"] + path = vendor/websocket.zig + url = git@github.com:lightpanda-io/websocket.zig.git + branch = lightpanda diff --git a/build.zig b/build.zig index 2c86f3ff..7bb94e4c 100644 --- a/build.zig +++ b/build.zig @@ -168,6 +168,11 @@ fn common( .root_source_file = b.path("vendor/tls.zig/src/main.zig"), }); step.root_module.addImport("tls", tlsmod); + + const wsmod = b.addModule("ws", .{ + .root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"), + }); + step.root_module.addImport("websocket", wsmod); } fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module { diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 14c7e3b0..387434aa 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -193,7 +193,7 @@ pub fn sendEvent( const resp = Resp{ .method = name, .params = params, .sessionId = sessionID }; const event_msg = try stringify(alloc, resp); - try server.sendAsync(ctx, event_msg); + try ctx.send(event_msg); } // Common diff --git a/src/cdp/page.zig b/src/cdp/page.zig index b2beb203..cf86297f 100644 --- a/src/cdp/page.zig +++ b/src/cdp/page.zig @@ -323,7 +323,7 @@ fn navigate( .loaderId = ctx.state.loaderID, }; const res = try result(alloc, input.id, Resp, resp, input.sessionId); - try server.sendAsync(ctx, res); + try ctx.send(res); // TODO: at this point do we need async the following actions to be async? diff --git a/src/cdp/target.zig b/src/cdp/target.zig index f65c4ac3..d038e2e8 100644 --- a/src/cdp/target.zig +++ b/src/cdp/target.zig @@ -292,7 +292,7 @@ fn disposeBrowserContext( // output const res = try result(alloc, input.id, null, .{}, null); - try server.sendAsync(ctx, res); + try ctx.send(res); return error.DisposeBrowserContext; } @@ -378,7 +378,7 @@ fn closeTarget( success: bool = true, }; const res = try result(alloc, input.id, Resp, Resp{}, null); - try server.sendAsync(ctx, res); + try ctx.send(res); // Inspector.detached event const InspectorDetached = struct { diff --git a/src/handler.zig b/src/handler.zig new file mode 100644 index 00000000..4e18ba14 --- /dev/null +++ b/src/handler.zig @@ -0,0 +1,83 @@ +// Copyright (C) 2023-2024 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); + +const ws = @import("websocket"); + +const log = std.log.scoped(.handler); + +pub const Stream = struct { + socket: std.posix.socket_t = undefined, + ws_conn: *ws.Conn = undefined, + + fn connectCDP(self: *Stream) !void { + const address = try std.net.Address.parseIp("127.0.0.1", 3245); + + const flags: u32 = std.posix.SOCK.STREAM; + const proto = std.posix.IPPROTO.TCP; + const socket = try std.posix.socket(address.any.family, flags, proto); + + try std.posix.connect( + socket, + &address.any, + address.getOsSockLen(), + ); + log.debug("connected to Stream server", .{}); + self.socket = socket; + } + + fn closeCDP(self: *const Stream) void { + std.posix.close(self.socket); + } + + fn start(self: *Stream, ws_conn: *ws.Conn) !void { + try self.connectCDP(); + self.ws_conn = ws_conn; + } + + pub fn recv(self: *const Stream, data: []const u8) !void { + var pos: usize = 0; + while (pos < data.len) { + const len = try std.posix.write(self.socket, data[pos..]); + pos += len; + } + } + + pub fn send(self: *const Stream, data: []const u8) !void { + return self.ws_conn.write(data); + } +}; + +pub const Handler = struct { + stream: *Stream, + + pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler { + try stream.start(ws_conn); + return .{ .stream = stream }; + } + + pub fn close(self: *Handler) void { + self.stream.closeCDP(); + } + + pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void { + const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data }); + try self.stream.recv(msg); + } +}; diff --git a/src/main.zig b/src/main.zig index c2106f73..6f40acd3 100644 --- a/src/main.zig +++ b/src/main.zig @@ -17,13 +17,14 @@ // along with this program. If not, see . const std = @import("std"); -const posix = std.posix; const builtin = @import("builtin"); const jsruntime = @import("jsruntime"); +const websocket = @import("websocket"); const Browser = @import("browser/browser.zig").Browser; const server = @import("server.zig"); +const handler = @import("handler.zig"); const parser = @import("netsurf"); const apiweb = @import("apiweb.zig"); @@ -32,103 +33,12 @@ pub const Types = jsruntime.reflect(apiweb.Interfaces); pub const UserContext = apiweb.UserContext; pub const IO = @import("asyncio").Wrapper(jsruntime.Loop); +// Simple blocking websocket connection model +// ie. 1 thread per ws connection without thread pool and epoll/kqueue +pub const websocket_blocking = true; + const log = std.log.scoped(.cli); -// Inspired by std.net.StreamServer in Zig < 0.12 -pub const StreamServer = struct { - /// Copied from `Options` on `init`. - kernel_backlog: u31, - reuse_address: bool, - reuse_port: bool, - nonblocking: bool, - - /// `undefined` until `listen` returns successfully. - listen_address: std.net.Address, - - sockfd: ?posix.socket_t, - - pub const Options = struct { - /// How many connections the kernel will accept on the application's behalf. - /// If more than this many connections pool in the kernel, clients will start - /// seeing "Connection refused". - kernel_backlog: u31 = 128, - - /// Enable SO.REUSEADDR on the socket. - reuse_address: bool = false, - - /// Enable SO.REUSEPORT on the socket. - reuse_port: bool = false, - - /// Non-blocking mode. - nonblocking: bool = false, - }; - - /// After this call succeeds, resources have been acquired and must - /// be released with `deinit`. - pub fn init(options: Options) StreamServer { - return StreamServer{ - .sockfd = null, - .kernel_backlog = options.kernel_backlog, - .reuse_address = options.reuse_address, - .reuse_port = options.reuse_port, - .nonblocking = options.nonblocking, - .listen_address = undefined, - }; - } - - /// Release all resources. The `StreamServer` memory becomes `undefined`. - pub fn deinit(self: *StreamServer) void { - self.close(); - self.* = undefined; - } - - fn setSockOpt(fd: posix.socket_t, level: i32, option: u32, value: c_int) !void { - try posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); - } - - pub fn listen(self: *StreamServer, address: std.net.Address) !void { - const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; - var use_sock_flags: u32 = sock_flags; - if (self.nonblocking) use_sock_flags |= posix.SOCK.NONBLOCK; - const proto = if (address.any.family == posix.AF.UNIX) @as(u32, 0) else posix.IPPROTO.TCP; - - const sockfd = try posix.socket(address.any.family, use_sock_flags, proto); - self.sockfd = sockfd; - errdefer { - posix.close(sockfd); - self.sockfd = null; - } - - // socket options - if (self.reuse_address) { - try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1); - } - if (@hasDecl(posix.SO, "REUSEPORT") and self.reuse_port) { - try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1); - } - if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS - // WARNING: disable Nagle's alogrithm to avoid latency issues - try setSockOpt(sockfd, posix.IPPROTO.TCP, posix.TCP.NODELAY, 1); - } - - var socklen = address.getOsSockLen(); - try posix.bind(sockfd, &address.any, socklen); - try posix.listen(sockfd, self.kernel_backlog); - try posix.getsockname(sockfd, &self.listen_address.any, &socklen); - } - - /// Stop listening. It is still necessary to call `deinit` after stopping listening. - /// Calling `deinit` will automatically call `close`. It is safe to call `close` when - /// not listening. - pub fn close(self: *StreamServer) void { - if (self.sockfd) |fd| { - posix.close(fd); - self.sockfd = null; - self.listen_address = undefined; - } - } -}; - const usage = \\usage: {s} [options] [URL] \\ @@ -319,27 +229,49 @@ pub fn main() !void { switch (cli_mode) { .server => |mode| { - // server - var srv = StreamServer.init(.{ - .reuse_address = true, - .reuse_port = true, - .nonblocking = true, - }); - defer srv.deinit(); - - srv.listen(mode.addr) catch |err| { + // Stream server + const socket = server.listen(mode.addr) catch |err| { log.err("address (host:port) {any}\n", .{err}); return printUsageExit(mode.execname, 1); }; - defer srv.close(); - log.info("Server mode: listening on {s}:{d}...", .{ mode.host, mode.port }); + defer std.posix.close(socket); + log.debug("Server mode: listening internally on {s}:{d}...", .{ mode.host, mode.port }); + + var stream = handler.Stream{}; // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); - // listen - try server.listen(alloc, &loop, srv.sockfd.?, std.time.ns_per_s * @as(u64, mode.timeout)); + // start stream server in separate thread + const cdp_thread = try std.Thread.spawn( + .{ .allocator = alloc }, + server.handle, + .{ + alloc, + &loop, + socket, + &stream, + std.time.ns_per_s * @as(u64, mode.timeout), + }, + ); + + // Websocket server + var ws = try websocket.Server(handler.Handler).init(alloc, .{ + .port = 9222, + .address = "127.0.0.1", + .handshake = .{ + .timeout = 3, + .max_size = 1024, + // since we aren't using hanshake.headers + // we can set this to 0 to save a few bytes. + .max_headers = 0, + }, + }); + defer ws.deinit(); + + try ws.listen(&stream); + cdp_thread.join(); }, .fetch => |mode| { diff --git a/src/msg.zig b/src/msg.zig index b3d4da5a..64a81bd9 100644 --- a/src/msg.zig +++ b/src/msg.zig @@ -102,7 +102,10 @@ pub const MsgBuffer = struct { } // copy the current input into MsgBuffer - @memcpy(self.buf[self.pos..new_pos], _input[0..]); + // NOTE: we could use @memcpy but it's not Thread-safe (alias problem) + // see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/ + // Intead we just use std.mem.copyForwards + std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]); // set the new cursor position self.pos = new_pos; diff --git a/src/server.zig b/src/server.zig index 9b34c602..b0e33dd9 100644 --- a/src/server.zig +++ b/src/server.zig @@ -19,6 +19,8 @@ const std = @import("std"); const builtin = @import("builtin"); +const Stream = @import("handler.zig").Stream; + const jsruntime = @import("jsruntime"); const Completion = jsruntime.IO.Completion; const AcceptError = jsruntime.IO.AcceptError; @@ -49,6 +51,7 @@ const MaxStdOutSize = 512; // ensure debug msg are not too long pub const Ctx = struct { loop: *jsruntime.Loop, + stream: ?*Stream, // internal fields accept_socket: std.posix.socket_t, @@ -283,7 +286,18 @@ pub const Ctx = struct { // send result if (!std.mem.eql(u8, res, "")) { - return sendAsync(self, res); + return self.send(res); + } + } + + pub fn send(self: *Ctx, msg: []const u8) !void { + if (self.stream) |stream| { + // if we have a stream connection, just write on it + defer self.alloc().free(msg); + try stream.send(msg); + } else { + // otherwise write asynchronously on the socket connection + return sendAsync(self, msg); } } @@ -362,7 +376,7 @@ pub const Ctx = struct { .{ msg_open, cdp.ContextSessionID }, ); - try sendAsync(ctx, s); + try ctx.send(s); } pub fn onInspectorResp(ctx_opaque: *anyopaque, _: u32, msg: []const u8) void { @@ -422,16 +436,17 @@ const Send = struct { pub fn sendAsync(ctx: *Ctx, msg: []const u8) !void { const sd = try Send.init(ctx, msg); - ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, msg); + ctx.loop.io.send(*Send, sd, Send.asyncCbk, &sd.completion, ctx.conn_socket, sd.msg); } -// Listen -// ------ +// Listener and handler +// -------------------- -pub fn listen( +pub fn handle( alloc: std.mem.Allocator, loop: *jsruntime.Loop, server_socket: std.posix.socket_t, + stream: ?*Stream, timeout: u64, ) anyerror!void { @@ -458,6 +473,7 @@ pub fn listen( // for accepting connections and receving messages var ctx = Ctx{ .loop = loop, + .stream = stream, .browser = &browser, .sessionNew = true, .read_buf = &read_buf, @@ -497,3 +513,35 @@ pub fn listen( } } } + +fn setSockOpt(fd: std.posix.socket_t, level: i32, option: u32, value: c_int) !void { + try std.posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); +} + +pub fn listen(address: std.net.Address) !std.posix.socket_t { + + // create socket + const flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC | std.posix.SOCK.NONBLOCK; + const sockfd = try std.posix.socket(address.any.family, flags, std.posix.IPPROTO.TCP); + errdefer std.posix.close(sockfd); + + // socket options + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); + if (@hasDecl(std.posix.SO, "REUSEPORT")) { + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, 1); + } + if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS + // WARNING: disable Nagle's alogrithm to avoid latency issues + try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + } + + // bind & listen + var socklen = address.getOsSockLen(); + try std.posix.bind(sockfd, &address.any, socklen); + const kernel_backlog = 1; // default value is 128. Here we just want 1 connection + try std.posix.listen(sockfd, kernel_backlog); + var listen_address: std.net.Address = undefined; + try std.posix.getsockname(sockfd, &listen_address.any, &socklen); + + return sockfd; +} diff --git a/vendor/websocket.zig b/vendor/websocket.zig new file mode 160000 index 00000000..ba14f387 --- /dev/null +++ b/vendor/websocket.zig @@ -0,0 +1 @@ +Subproject commit ba14f387b22210667a2941c1e5e4170eb1854957