Make TCP server websocket-aware

Adding HTTP & websocket awareness to the TCP server.

HTTP server handles `GET /json/version` and websocket upgrade requests.

Conceptually, websocket handling is the same code as before, but receiving
data will parse the websocket frames and writing data will wrap it in
a websocket frame.

The previous `Ctx` was split into a `Server` and a `Client`. This was
largely done to make it easy to write unit tests, since the `Client` is
a generic, all its dependencies (i.e. the server) can be mocked out. This
also makes it a bit nicer to know if there is or isn't a client (via the
server's client optional).

Added a MemoryPool for the Send object (I thought that was a nice touch!)

Removed MacOS hack on accept/conn completion usage.

Known issues:
- When framing an outgoing message, the entire message has to be duped. This
is no worse than how it was before, but it should be possible to eliminate
this in the future. Probably not part of this PR.

- Websocket parsing will reject continuation frames. I don't know of a single
client that will send a fragmented message (websocket has its own
message fragmentation), but we should probably still support this just in
case.

- I don't think the receive, timeout and close completions can safely be
re-used like we're doing. I believe they need to be associated with a specific
client socket.

- A new connection creates a new browser session. I think this is right (??),
but for the very first, we're throwing out a perfectly usable session. I'm
thinking this might be a change to how Browser/Sessions work.

- zig build test won't compile. This branch reproduces the issue with none
of these changes:
https://github.com/karlseguin/browser/tree/broken_test_build

(or, as a diff to main):
https://github.com/lightpanda-io/browser/compare/main...karlseguin:broken_test_build
This commit is contained in:
Karl Seguin
2025-02-06 22:05:01 +08:00
parent 50b53b00e0
commit c0c0694fcc
7 changed files with 1373 additions and 693 deletions

4
.gitmodules vendored
View File

@@ -28,7 +28,3 @@
[submodule "vendor/zig-async-io"]
path = vendor/zig-async-io
url = https://github.com/lightpanda-io/zig-async-io.git/
[submodule "vendor/websocket.zig"]
path = vendor/websocket.zig
url = https://github.com/lightpanda-io/websocket.zig.git/
branch = lightpanda

View File

@@ -131,12 +131,12 @@ fn sendInspector(
const buf = try alloc.alloc(u8, msg.json.len + 1);
defer alloc.free(buf);
_ = std.mem.replace(u8, msg.json, "\"awaitPromise\":true", "\"awaitPromise\":false", buf);
ctx.sendInspector(buf);
try ctx.sendInspector(buf);
return "";
}
}
ctx.sendInspector(msg.json);
try ctx.sendInspector(msg.json);
if (msg.id == null) return "";

View File

@@ -1,95 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const ws = @import("websocket");
const Msg = @import("msg.zig").Msg;
const log = std.log.scoped(.handler);
pub const Stream = struct {
addr: std.net.Address,
socket: std.posix.socket_t = undefined,
ws_host: []const u8,
ws_port: u16,
ws_conn: *ws.Conn = undefined,
fn connectCDP(self: *Stream) !void {
const flags: u32 = std.posix.SOCK.STREAM;
const proto = blk: {
if (self.addr.any.family == std.posix.AF.UNIX) break :blk @as(u32, 0);
break :blk std.posix.IPPROTO.TCP;
};
const socket = try std.posix.socket(self.addr.any.family, flags, proto);
try std.posix.connect(
socket,
&self.addr.any,
self.addr.getOsSockLen(),
);
log.debug("connected to Stream server", .{});
self.socket = socket;
}
fn closeCDP(self: *const Stream) void {
const close_msg: []const u8 = .{ 5, 0, 0, 0 } ++ "close";
self.recv(close_msg) catch |err| {
log.err("stream close error: {any}", .{err});
};
std.posix.close(self.socket);
}
fn start(self: *Stream, ws_conn: *ws.Conn) !void {
try self.connectCDP();
self.ws_conn = ws_conn;
}
pub fn recv(self: *const Stream, data: []const u8) !void {
var pos: usize = 0;
while (pos < data.len) {
const len = try std.posix.write(self.socket, data[pos..]);
pos += len;
}
}
pub fn send(self: *const Stream, data: []const u8) !void {
return self.ws_conn.write(data);
}
};
pub const Handler = struct {
stream: *Stream,
pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler {
try stream.start(ws_conn);
return .{ .stream = stream };
}
pub fn close(self: *Handler) void {
self.stream.closeCDP();
}
pub fn clientMessage(self: *Handler, data: []const u8) !void {
var header: [4]u8 = undefined;
Msg.setSize(data.len, &header);
try self.stream.recv(&header);
try self.stream.recv(data);
}
};

View File

@@ -20,12 +20,9 @@ const std = @import("std");
const builtin = @import("builtin");
const jsruntime = @import("jsruntime");
const websocket = @import("websocket");
const Browser = @import("browser/browser.zig").Browser;
const server = @import("server.zig");
const handler = @import("handler.zig");
const MaxSize = @import("msg.zig").MaxSize;
const parser = @import("netsurf");
const apiweb = @import("apiweb.zig");
@@ -86,11 +83,9 @@ const CliMode = union(CliModeTag) {
const Server = struct {
execname: []const u8 = undefined,
args: *std.process.ArgIterator = undefined,
addr: std.net.Address = undefined,
host: []const u8 = Host,
port: u16 = Port,
timeout: u8 = Timeout,
tcp: bool = false, // undocumented TCP mode
// default options
const Host = "127.0.0.1";
@@ -160,10 +155,6 @@ const CliMode = union(CliModeTag) {
return printUsageExit(execname, 1);
}
}
if (std.mem.eql(u8, "--tcp", opt)) {
_server.tcp = true;
continue;
}
// unknown option
if (std.mem.startsWith(u8, opt, "--")) {
@@ -186,10 +177,6 @@ const CliMode = union(CliModeTag) {
if (default_mode == .server) {
// server mode
_server.addr = std.net.Address.parseIp4(_server.host, _server.port) catch |err| {
log.err("address (host:port) {any}\n", .{err});
return printUsageExit(execname, 1);
};
_server.execname = execname;
_server.args = args;
return CliMode{ .server = _server };
@@ -247,65 +234,19 @@ pub fn main() !void {
switch (cli_mode) {
.server => |opts| {
// Stream server
const addr = blk: {
if (opts.tcp) {
break :blk opts.addr;
} else {
const unix_path = "/tmp/lightpanda";
std.fs.deleteFileAbsolute(unix_path) catch {}; // file could not exists
break :blk try std.net.Address.initUnix(unix_path);
}
};
const socket = server.listen(addr) catch |err| {
log.err("Server listen error: {any}\n", .{err});
const address = std.net.Address.parseIp4(opts.host, opts.port) catch |err| {
log.err("address (host:port) {any}\n", .{err});
return printUsageExit(opts.execname, 1);
};
defer std.posix.close(socket);
log.debug("Server opts: listening internally on {any}...", .{addr});
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
// loop
var loop = try jsruntime.Loop.init(alloc);
defer loop.deinit();
// TCP server mode
if (opts.tcp) {
return server.handle(alloc, &loop, socket, null, timeout);
}
// start stream server in separate thread
var stream = handler.Stream{
.ws_host = opts.host,
.ws_port = opts.port,
.addr = addr,
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
server.run(alloc, address, timeout, &loop) catch |err| {
log.err("Server error", .{});
return err;
};
const cdp_thread = try std.Thread.spawn(
.{ .allocator = alloc },
server.handle,
.{ alloc, &loop, socket, &stream, timeout },
);
// Websocket server
var ws = try websocket.Server(handler.Handler).init(alloc, .{
.port = opts.port,
.address = opts.host,
.max_message_size = MaxSize + 14, // overhead websocket
.max_conn = 1,
.handshake = .{
.timeout = 3,
.max_size = 1024,
// since we aren't using hanshake.headers
// we can set this to 0 to save a few bytes.
.max_headers = 0,
},
});
defer ws.deinit();
try ws.listen(&stream);
cdp_thread.join();
},
.fetch => |opts| {

View File

@@ -314,9 +314,6 @@ const kb = 1024;
const ms = std.time.ns_per_ms;
test {
const msgTest = @import("msg.zig");
std.testing.refAllDecls(msgTest);
const dumpTest = @import("browser/dump.zig");
std.testing.refAllDecls(dumpTest);
@@ -340,6 +337,7 @@ test {
std.testing.refAllDecls(@import("generate.zig"));
std.testing.refAllDecls(@import("cdp/msg.zig"));
std.testing.refAllDecls(@import("server.zig"));
}
fn testJSRuntime(alloc: std.mem.Allocator) !void {

View File

@@ -1,166 +0,0 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
pub const HeaderSize = 4;
pub const MsgSize = 256 * 1204; // 256KB
// NOTE: Theorically we could go up to 4GB with a 4 bytes binary encoding
// but we prefer to put a lower hard limit for obvious memory size reasons.
pub const MaxSize = HeaderSize + MsgSize;
pub const Msg = struct {
pub fn getSize(data: []const u8) usize {
return std.mem.readInt(u32, data[0..HeaderSize], .little);
}
pub fn setSize(len: usize, header: *[4]u8) void {
std.mem.writeInt(u32, header, @intCast(len), .little);
}
};
/// Buffer returns messages from a raw text read stream,
/// with the message size being encoded on the 2 first bytes (little endian)
/// It handles both:
/// - combined messages in one read
/// - single message in several reads (multipart)
/// It's safe (and a good practice) to reuse the same Buffer
/// on several reads of the same stream.
pub const Buffer = struct {
buf: []u8,
size: usize = 0,
pos: usize = 0,
fn isFinished(self: *const Buffer) bool {
return self.pos >= self.size;
}
fn isEmpty(self: *const Buffer) bool {
return self.size == 0 and self.pos == 0;
}
fn reset(self: *Buffer) void {
self.size = 0;
self.pos = 0;
}
// read input
pub fn read(self: *Buffer, input: []const u8) !struct {
msg: []const u8,
left: []const u8,
} {
var _input = input; // make input writable
// msg size
var msg_size: usize = undefined;
if (self.isEmpty()) {
// decode msg size header
msg_size = Msg.getSize(_input);
_input = _input[HeaderSize..];
} else {
msg_size = self.size;
}
// multipart
const is_multipart = !self.isEmpty() or _input.len < msg_size;
if (is_multipart) {
// set msg size on empty Buffer
if (self.isEmpty()) {
self.size = msg_size;
}
// get the new position of the cursor
const new_pos = self.pos + _input.len;
// check max limit size
if (new_pos > MaxSize) {
return error.MsgTooBig;
}
// copy the current input into Buffer
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
// Intead we just use std.mem.copyForwards
std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]);
// set the new cursor position
self.pos = new_pos;
// if multipart is not finished, go fetch the next input
if (!self.isFinished()) return error.MsgMultipart;
// otherwhise multipart is finished, use its buffer as input
_input = self.buf[0..self.pos];
self.reset();
}
// handle several JSON msg in 1 read
return .{ .msg = _input[0..msg_size], .left = _input[msg_size..] };
}
};
test "Buffer" {
const Case = struct {
input: []const u8,
nb: u8,
};
const cases = [_]Case{
// simple
.{ .input = .{ 2, 0, 0, 0 } ++ "ok", .nb = 1 },
// combined
.{ .input = .{ 2, 0, 0, 0 } ++ "ok" ++ .{ 3, 0, 0, 0 } ++ "foo", .nb = 2 },
// multipart
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part", .nb = 1 },
// multipart & combined
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part" ++ .{ 2, 0, 0, 0 } ++ "ok", .nb = 2 },
// multipart & combined with other multipart
.{ .input = .{ 9, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "part" ++ .{ 8, 0, 0, 0 } ++ "co", .nb = 1 },
.{ .input = "mbined", .nb = 1 },
// several multipart
.{ .input = .{ 23, 0, 0, 0 } ++ "multi", .nb = 0 },
.{ .input = "several", .nb = 0 },
.{ .input = "complex", .nb = 0 },
.{ .input = "part", .nb = 1 },
// combined & multipart
.{ .input = .{ 2, 0, 0, 0 } ++ "ok" ++ .{ 9, 0, 0, 0 } ++ "multi", .nb = 1 },
.{ .input = "part", .nb = 1 },
};
var b: [MaxSize]u8 = undefined;
var buf = Buffer{ .buf = &b };
for (cases) |case| {
var nb: u8 = 0;
var input = case.input;
while (input.len > 0) {
const parts = buf.read(input) catch |err| {
if (err == error.MsgMultipart) break; // go to the next case input
return err;
};
nb += 1;
input = parts.left;
}
try std.testing.expect(nb == case.nb);
}
}

File diff suppressed because it is too large Load Diff