From 8449d5ab224f667c19e46f63199a944e6fff9b43 Mon Sep 17 00:00:00 2001 From: Francis Bouvier Date: Wed, 27 Nov 2024 21:16:16 +0100 Subject: [PATCH] websocket: use Unix socket for internal server And add an option for TCP only server Signed-off-by: Francis Bouvier --- src/handler.zig | 14 ++++++++------ src/main.zig | 42 ++++++++++++++++++++++++++++-------------- src/server.zig | 18 +++++++++++++----- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/src/handler.zig b/src/handler.zig index 4e18ba14..bcfa1dd1 100644 --- a/src/handler.zig +++ b/src/handler.zig @@ -23,20 +23,22 @@ const ws = @import("websocket"); const log = std.log.scoped(.handler); pub const Stream = struct { + addr: std.net.Address, socket: std.posix.socket_t = undefined, ws_conn: *ws.Conn = undefined, fn connectCDP(self: *Stream) !void { - const address = try std.net.Address.parseIp("127.0.0.1", 3245); - const flags: u32 = std.posix.SOCK.STREAM; - const proto = std.posix.IPPROTO.TCP; - const socket = try std.posix.socket(address.any.family, flags, proto); + const proto = blk: { + if (self.addr.any.family == std.posix.AF.UNIX) break :blk @as(u32, 0); + break :blk std.posix.IPPROTO.TCP; + }; + const socket = try std.posix.socket(self.addr.any.family, flags, proto); try std.posix.connect( socket, - &address.any, - address.getOsSockLen(), + &self.addr.any, + self.addr.getOsSockLen(), ); log.debug("connected to Stream server", .{}); self.socket = socket; diff --git a/src/main.zig b/src/main.zig index 6f40acd3..61bd2f49 100644 --- a/src/main.zig +++ b/src/main.zig @@ -80,10 +80,11 @@ const CliMode = union(CliModeTag) { host: []const u8 = Host, port: u16 = Port, timeout: u8 = Timeout, + tcp: bool = false, // undocumented TCP mode // default options const Host = "127.0.0.1"; - const Port = 3245; + const Port = 9222; const Timeout = 3; // in seconds }; @@ -145,6 +146,10 @@ const CliMode = union(CliModeTag) { return printUsageExit(execname, 1); } } + if (std.mem.eql(u8, "--tcp", opt)) { + _server.tcp = true; + continue; + } // unknown option if (std.mem.startsWith(u8, opt, "--")) { @@ -230,36 +235,45 @@ pub fn main() !void { .server => |mode| { // Stream server - const socket = server.listen(mode.addr) catch |err| { - log.err("address (host:port) {any}\n", .{err}); + const addr = blk: { + if (mode.tcp) { + break :blk mode.addr; + } else { + const unix_path = "/tmp/lightpanda"; + std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists + break :blk try std.net.Address.initUnix(unix_path); + } + }; + const socket = server.listen(addr) catch |err| { + log.err("Server listen error: {any}\n", .{err}); return printUsageExit(mode.execname, 1); }; defer std.posix.close(socket); - log.debug("Server mode: listening internally on {s}:{d}...", .{ mode.host, mode.port }); + log.debug("Server mode: listening internally on {any}...", .{addr}); - var stream = handler.Stream{}; + const timeout = std.time.ns_per_s * @as(u64, mode.timeout); // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); + // TCP server mode + if (mode.tcp) { + return server.handle(alloc, &loop, socket, null, timeout); + } + // start stream server in separate thread + var stream = handler.Stream{ .addr = addr }; const cdp_thread = try std.Thread.spawn( .{ .allocator = alloc }, server.handle, - .{ - alloc, - &loop, - socket, - &stream, - std.time.ns_per_s * @as(u64, mode.timeout), - }, + .{ alloc, &loop, socket, &stream, timeout }, ); // Websocket server var ws = try websocket.Server(handler.Handler).init(alloc, .{ - .port = 9222, - .address = "127.0.0.1", + .port = mode.port, + .address = mode.host, .handshake = .{ .timeout = 3, .max_size = 1024, diff --git a/src/server.zig b/src/server.zig index b0e33dd9..0926d5f5 100644 --- a/src/server.zig +++ b/src/server.zig @@ -518,21 +518,29 @@ fn setSockOpt(fd: std.posix.socket_t, level: i32, option: u32, value: c_int) !vo try std.posix.setsockopt(fd, level, option, &std.mem.toBytes(value)); } +fn isUnixSocket(addr: std.net.Address) bool { + return addr.any.family == std.posix.AF.UNIX; +} + pub fn listen(address: std.net.Address) !std.posix.socket_t { // create socket const flags = std.posix.SOCK.STREAM | std.posix.SOCK.CLOEXEC | std.posix.SOCK.NONBLOCK; - const sockfd = try std.posix.socket(address.any.family, flags, std.posix.IPPROTO.TCP); + const proto = if (isUnixSocket(address)) @as(u32, 0) else std.posix.IPPROTO.TCP; + const sockfd = try std.posix.socket(address.any.family, flags, proto); errdefer std.posix.close(sockfd); // socket options - try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); if (@hasDecl(std.posix.SO, "REUSEPORT")) { try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, 1); + } else { + try setSockOpt(sockfd, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, 1); } - if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS - // WARNING: disable Nagle's alogrithm to avoid latency issues - try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + if (!isUnixSocket(address)) { + if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS + // WARNING: disable Nagle's alogrithm to avoid latency issues + try setSockOpt(sockfd, std.posix.IPPROTO.TCP, std.posix.TCP.NODELAY, 1); + } } // bind & listen