Merge pull request #408 from karlseguin/websocket_server
Some checks failed
wpt / web platform tests (push) Has been cancelled
zig-test / zig build dev (push) Has been cancelled
zig-test / zig build release (push) Has been cancelled
zig-test / zig test (push) Has been cancelled
wpt / perf-fmt (push) Has been cancelled
zig-test / perf-fmt (push) Has been cancelled
zig-test / demo-puppeteer (push) Has been cancelled

Make TCP server websocket-aware
This commit is contained in:
Pierre Tachoire
2025-02-13 09:04:23 +01:00
committed by GitHub
10 changed files with 1598 additions and 701 deletions

4
.gitmodules vendored
View File

@@ -28,7 +28,3 @@
[submodule "vendor/zig-async-io"]
path = vendor/zig-async-io
url = https://github.com/lightpanda-io/zig-async-io.git/
[submodule "vendor/websocket.zig"]
path = vendor/websocket.zig
url = https://github.com/lightpanda-io/websocket.zig.git/
branch = lightpanda

View File

@@ -189,11 +189,6 @@ fn common(
.root_source_file = b.path("vendor/tls.zig/src/main.zig"),
});
step.root_module.addImport("tls", tlsmod);
const wsmod = b.addModule("websocket", .{
.root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"),
});
step.root_module.addImport("websocket", wsmod);
}
fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module {

View File

@@ -131,12 +131,12 @@ fn sendInspector(
const buf = try alloc.alloc(u8, msg.json.len + 1);
defer alloc.free(buf);
_ = std.mem.replace(u8, msg.json, "\"awaitPromise\":true", "\"awaitPromise\":false", buf);
ctx.sendInspector(buf);
try ctx.sendInspector(buf);
return "";
}
}
ctx.sendInspector(msg.json);
try ctx.sendInspector(msg.json);
if (msg.id == null) return "";

View File

@@ -1,95 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const ws = @import("websocket");
const Msg = @import("msg.zig").Msg;
const log = std.log.scoped(.handler);
pub const Stream = struct {
addr: std.net.Address,
socket: std.posix.socket_t = undefined,
ws_host: []const u8,
ws_port: u16,
ws_conn: *ws.Conn = undefined,
fn connectCDP(self: *Stream) !void {
const flags: u32 = std.posix.SOCK.STREAM;
const proto = blk: {
if (self.addr.any.family == std.posix.AF.UNIX) break :blk @as(u32, 0);
break :blk std.posix.IPPROTO.TCP;
};
const socket = try std.posix.socket(self.addr.any.family, flags, proto);
try std.posix.connect(
socket,
&self.addr.any,
self.addr.getOsSockLen(),
);
log.debug("connected to Stream server", .{});
self.socket = socket;
}
fn closeCDP(self: *const Stream) void {
const close_msg: []const u8 = .{ 5, 0, 0, 0 } ++ "close";
self.recv(close_msg) catch |err| {
log.err("stream close error: {any}", .{err});
};
std.posix.close(self.socket);
}
fn start(self: *Stream, ws_conn: *ws.Conn) !void {
try self.connectCDP();
self.ws_conn = ws_conn;
}
pub fn recv(self: *const Stream, data: []const u8) !void {
var pos: usize = 0;
while (pos < data.len) {
const len = try std.posix.write(self.socket, data[pos..]);
pos += len;
}
}
pub fn send(self: *const Stream, data: []const u8) !void {
return self.ws_conn.write(data);
}
};
pub const Handler = struct {
stream: *Stream,
pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler {
try stream.start(ws_conn);
return .{ .stream = stream };
}
pub fn close(self: *Handler) void {
self.stream.closeCDP();
}
pub fn clientMessage(self: *Handler, data: []const u8) !void {
var header: [4]u8 = undefined;
Msg.setSize(data.len, &header);
try self.stream.recv(&header);
try self.stream.recv(data);
}
};

View File

@@ -20,12 +20,9 @@ const std = @import("std");
const builtin = @import("builtin");
const jsruntime = @import("jsruntime");
const websocket = @import("websocket");
const Browser = @import("browser/browser.zig").Browser;
const server = @import("server.zig");
const handler = @import("handler.zig");
const MaxSize = @import("msg.zig").MaxSize;
const parser = @import("netsurf");
const apiweb = @import("apiweb.zig");
@@ -86,11 +83,9 @@ const CliMode = union(CliModeTag) {
const Server = struct {
execname: []const u8 = undefined,
args: *std.process.ArgIterator = undefined,
addr: std.net.Address = undefined,
host: []const u8 = Host,
port: u16 = Port,
timeout: u8 = Timeout,
tcp: bool = false, // undocumented TCP mode
// default options
const Host = "127.0.0.1";
@@ -160,10 +155,6 @@ const CliMode = union(CliModeTag) {
return printUsageExit(execname, 1);
}
}
if (std.mem.eql(u8, "--tcp", opt)) {
_server.tcp = true;
continue;
}
// unknown option
if (std.mem.startsWith(u8, opt, "--")) {
@@ -186,10 +177,6 @@ const CliMode = union(CliModeTag) {
if (default_mode == .server) {
// server mode
_server.addr = std.net.Address.parseIp4(_server.host, _server.port) catch |err| {
log.err("address (host:port) {any}\n", .{err});
return printUsageExit(execname, 1);
};
_server.execname = execname;
_server.args = args;
return CliMode{ .server = _server };
@@ -247,65 +234,19 @@ pub fn main() !void {
switch (cli_mode) {
.server => |opts| {
// Stream server
const addr = blk: {
if (opts.tcp) {
break :blk opts.addr;
} else {
const unix_path = "/tmp/lightpanda";
std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists
break :blk try std.net.Address.initUnix(unix_path);
}
};
const socket = server.listen(addr) catch |err| {
log.err("Server listen error: {any}\n", .{err});
const address = std.net.Address.parseIp4(opts.host, opts.port) catch |err| {
log.err("address (host:port) {any}\n", .{err});
return printUsageExit(opts.execname, 1);
};
defer std.posix.close(socket);
log.debug("Server opts: listening internally on {any}...", .{addr});
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
// loop
var loop = try jsruntime.Loop.init(alloc);
defer loop.deinit();
// TCP server mode
if (opts.tcp) {
return server.handle(alloc, &loop, socket, null, timeout);
}
// start stream server in separate thread
var stream = handler.Stream{
.ws_host = opts.host,
.ws_port = opts.port,
.addr = addr,
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
server.run(alloc, address, timeout, &loop) catch |err| {
log.err("Server error", .{});
return err;
};
const cdp_thread = try std.Thread.spawn(
.{ .allocator = alloc },
server.handle,
.{ alloc, &loop, socket, &stream, timeout },
);
// Websocket server
var ws = try websocket.Server(handler.Handler).init(alloc, .{
.port = opts.port,
.address = opts.host,
.max_message_size = MaxSize + 14, // overhead websocket
.max_conn = 1,
.handshake = .{
.timeout = 3,
.max_size = 1024,
// since we aren't using hanshake.headers
// we can set this to 0 to save a few bytes.
.max_headers = 0,
},
});
defer ws.deinit();
try ws.listen(&stream);
cdp_thread.join();
},
.fetch => |opts| {

View File

@@ -314,9 +314,6 @@ const kb = 1024;
const ms = std.time.ns_per_ms;
test {
const msgTest = @import("msg.zig");
std.testing.refAllDecls(msgTest);
const dumpTest = @import("browser/dump.zig");
std.testing.refAllDecls(dumpTest);
@@ -340,6 +337,12 @@ test {
std.testing.refAllDecls(@import("generate.zig"));
std.testing.refAllDecls(@import("cdp/msg.zig"));
// Don't use refAllDecls, as this will pull in the entire project
// and break the test build.
// We should fix this. See this branch & the commit message for details:
// https://github.com/karlseguin/browser/commit/193ab5ceab3d3758ea06db04f7690460d79eb79e
_ = @import("server.zig");
}
fn testJSRuntime(alloc: std.mem.Allocator) !void {

View File

@@ -1,166 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
pub const HeaderSize = 4;
pub const MsgSize = 256 * 1204; // 256KB
// NOTE: Theorically we could go up to 4GB with a 4 bytes binary encoding
// but we prefer to put a lower hard limit for obvious memory size reasons.
pub const MaxSize = HeaderSize + MsgSize;
pub const Msg = struct {
pub fn getSize(data: []const u8) usize {
return std.mem.readInt(u32, data[0..HeaderSize], .little);
}
pub fn setSize(len: usize, header: *[4]u8) void {
std.mem.writeInt(u32, header, @intCast(len), .little);
}
};
/// Buffer returns messages from a raw text read stream,
/// with the message size being encoded on the 2 first bytes (little endian)
/// It handles both:
/// - combined messages in one read
/// - single message in several reads (multipart)
/// It's safe (and a good practice) to reuse the same Buffer
/// on several reads of the same stream.
pub const Buffer = struct {
buf: []u8,
size: usize = 0,
pos: usize = 0,
fn isFinished(self: *const Buffer) bool {
return self.pos >= self.size;
}
fn isEmpty(self: *const Buffer) bool {
return self.size == 0 and self.pos == 0;
}
fn reset(self: *Buffer) void {
self.size = 0;
self.pos = 0;
}
// read input
pub fn read(self: *Buffer, input: []const u8) !struct {
msg: []const u8,
left: []const u8,
} {
var _input = input; // make input writable
// msg size
var msg_size: usize = undefined;
if (self.isEmpty()) {
// decode msg size header
msg_size = Msg.getSize(_input);
_input = _input[HeaderSize..];
} else {
msg_size = self.size;
}
// multipart
const is_multipart = !self.isEmpty() or _input.len < msg_size;
if (is_multipart) {
// set msg size on empty Buffer
if (self.isEmpty()) {
self.size = msg_size;
}
// get the new position of the cursor
const new_pos = self.pos + _input.len;
// check max limit size
if (new_pos > MaxSize) {
return error.MsgTooBig;
}
// copy the current input into Buffer
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
// Intead we just use std.mem.copyForwards
std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]);
// set the new cursor position
self.pos = new_pos;
// if multipart is not finished, go fetch the next input
if (!self.isFinished()) return error.MsgMultipart;
// otherwhise multipart is finished, use its buffer as input
_input = self.buf[0..self.pos];
self.reset();
}
// handle several JSON msg in 1 read
return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] };
}
};
test "Buffer" {
const Case = struct {
input: []const u8,
nb: u8,
};
const cases = [_]Case{
// simple
.{ .input = .{ 2, 0, 0, 0 } ++ "ok", .nb = 1 },
// combined
.{ .input = .{ 2, 0, 0, 0 } ++ "ok" ++ .{ 3, 0, 0, 0 } ++ "foo", .nb = 2 },
// multipart
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part", .nb = 1 },
// multipart & combined
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part" ++ .{ 2, 0, 0, 0 } ++ "ok", .nb = 2 },
// multipart & combined with other multipart
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part" ++ .{ 8, 0, 0, 0 } ++ "co", .nb = 1 },
.{ .input = "mbined", .nb = 1 },
// several multipart
.{ .input = .{ 23, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "several", .nb = 0 },
.{ .input = "complex", .nb = 0 },
.{ .input = "part", .nb = 1 },
// combined & multipart
.{ .input = .{ 2, 0, 0, 0 } ++ "ok" ++ .{ 9, 0, 0, 0 } ++ "multi", .nb = 1 },
.{ .input = "part", .nb = 1 },
};
var b: [MaxSize]u8 = undefined;
var buf = Buffer{ .buf = &b };
for (cases) |case| {
var nb: u8 = 0;
var input = case.input;
while (input.len > 0) {
const parts = buf.read(input) catch |err| {
if (err == error.MsgMultipart) break; // go to the next case input
return err;
};
nb += 1;
input = parts.left;
}
try std.testing.expect(nb == case.nb);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -341,7 +341,7 @@ test {
std.testing.refAllDecls(@import("css/parser.zig"));
std.testing.refAllDecls(@import("generate.zig"));
std.testing.refAllDecls(@import("http/Client.zig"));
std.testing.refAllDecls(@import("msg.zig"));
std.testing.refAllDecls(@import("storage/storage.zig"));
std.testing.refAllDecls(@import("iterator/iterator.zig"));
std.testing.refAllDecls(@import("server.zig"));
}

Submodule vendor/websocket.zig deleted from 1b49626c78