Give websockets their own connection pool, improve websocket message logging

This commit is contained in:
Karl Seguin
2026-04-02 10:27:03 +08:00
parent 5733c35a2d
commit 14dcb7895a
6 changed files with 406 additions and 23 deletions

View File

@@ -128,6 +128,13 @@ pub fn httpMaxResponseSize(self: *const Config) ?usize {
}; };
} }
pub fn wsMaxConcurrent(self: *const Config) u8 {
return switch (self.mode) {
inline .serve, .fetch, .mcp => |opts| opts.common.ws_max_concurrent orelse 8,
else => unreachable,
};
}
pub fn logLevel(self: *const Config) ?log.Level { pub fn logLevel(self: *const Config) ?log.Level {
return switch (self.mode) { return switch (self.mode) {
inline .serve, .fetch, .mcp => |opts| opts.common.log_level, inline .serve, .fetch, .mcp => |opts| opts.common.log_level,
@@ -275,6 +282,7 @@ pub const Common = struct {
http_timeout: ?u31 = null, http_timeout: ?u31 = null,
http_connect_timeout: ?u31 = null, http_connect_timeout: ?u31 = null,
http_max_response_size: ?usize = null, http_max_response_size: ?usize = null,
ws_max_concurrent: ?u8 = null,
tls_verify_host: bool = true, tls_verify_host: bool = true,
log_level: ?log.Level = null, log_level: ?log.Level = null,
log_format: ?log.Format = null, log_format: ?log.Format = null,
@@ -375,6 +383,10 @@ pub fn printUsageAndExit(self: *const Config, success: bool) void {
\\ (e.g. XHR, fetch, script loading, ...). \\ (e.g. XHR, fetch, script loading, ...).
\\ Defaults to no limit. \\ Defaults to no limit.
\\ \\
\\--ws-max-concurrent
\\ The maximum number of concurrent WebSocket connections.
\\ Defaults to 8.
\\
\\--log-level The log level: debug, info, warn, error or fatal. \\--log-level The log level: debug, info, warn, error or fatal.
\\ Defaults to \\ Defaults to
++ (if (builtin.mode == .Debug) " info." else "warn.") ++ ++ (if (builtin.mode == .Debug) " info." else "warn.") ++
@@ -983,6 +995,19 @@ fn parseCommonArg(
return true; return true;
} }
if (std.mem.eql(u8, "--ws-max-concurrent", opt) or std.mem.eql(u8, "--ws_max_concurrent", opt)) {
const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = opt });
return error.InvalidArgument;
};
common.ws_max_concurrent = std.fmt.parseInt(u8, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = opt, .err = err });
return error.InvalidArgument;
};
return true;
}
if (std.mem.eql(u8, "--log-level", opt) or std.mem.eql(u8, "--log_level", opt)) { if (std.mem.eql(u8, "--log-level", opt) or std.mem.eql(u8, "--log_level", opt)) {
const str = args.next() orelse { const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = opt }); log.fatal(.app, "missing argument value", .{ .arg = opt });

View File

@@ -238,3 +238,309 @@
}); });
} }
</script> </script>
<script id=binary_arraybuffer type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.binaryType = 'arraybuffer';
ws.addEventListener('open', () => {
const buffer = new ArrayBuffer(4);
const view = new Uint8Array(buffer);
view[0] = 10;
view[1] = 20;
view[2] = 30;
view[3] = 40;
ws.send(buffer);
});
ws.addEventListener('message', (e) => {
const arr = new Uint8Array(e.data);
received.push(arr.length);
received.push(arr[1]); // First byte of our data (after 0xEE marker)
received.push(arr[4]); // Last byte of our data
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
testing.expectEqual([5, 10, 40], received);
});
}
</script>
<script id=binary_int32array type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.binaryType = 'arraybuffer';
ws.addEventListener('open', () => {
const arr = new Int32Array([0x01020304, 0x05060708]);
ws.send(arr);
});
ws.addEventListener('message', (e) => {
received.push(e.data.byteLength);
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
// 1 marker byte + 8 bytes (2 x 4-byte int32)
testing.expectEqual([9], received);
});
}
</script>
<script id=binary_int32array22 type=module>
{
const state = await testing.async();
let received = [];
console.warn('last-test');
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.binaryType = 'arraybuffer';
ws.addEventListener('open', () => {
const arr = new Int32Array([0x01020304, 0x05060708]);
ws.send(arr);
});
ws.addEventListener('message', (e) => {
received.push(e.data.byteLength);
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
console.warn('aaa');
// 1 marker byte + 8 bytes (2 x 4-byte int32)
testing.expectEqual([9], received);
});
}
</script>
<script id=binary_blob type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.binaryType = 'arraybuffer';
ws.addEventListener('open', () => {
const blob = new Blob(['hello'], { type: 'text/plain' });
ws.send(blob);
});
ws.addEventListener('message', (e) => {
const arr = new Uint8Array(e.data);
received.push(arr.length);
received.push(arr[0]); // 0xEE marker
// 'h' = 104, 'e' = 101, 'l' = 108
received.push(arr[1]); // 'h'
received.push(arr[2]); // 'e'
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
testing.expectEqual([6, 0xEE, 104, 101], received);
});
}
</script>
<script id=server_close type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.addEventListener('open', () => {
ws.send('close:1001:going away');
});
ws.addEventListener('close', (e) => {
received.push(e.code);
received.push(e.reason);
received.push(e.wasClean);
state.resolve();
});
await state.done(() => {
testing.expectEqual([1001, 'going away', true], received);
});
}
</script>
<script id=force_close type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.addEventListener('open', () => {
ws.send('force-close');
});
ws.addEventListener('close', (e) => {
received.push('closed');
received.push(e.wasClean);
state.resolve();
});
ws.addEventListener('error', () => {
received.push('error');
});
await state.done(() => {
// Connection was not cleanly closed
testing.expectEqual(['closed', false], received);
});
}
</script>
<script id=ready_state type=module>
{
const state = await testing.async();
let states = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
states.push(ws.readyState); // CONNECTING = 0
ws.addEventListener('open', () => {
states.push(ws.readyState); // OPEN = 1
ws.close();
states.push(ws.readyState); // CLOSING = 2
});
ws.addEventListener('close', () => {
states.push(ws.readyState); // CLOSED = 3
state.resolve();
});
await state.done(() => {
testing.expectEqual([0, 1, 2, 3], states);
});
}
</script>
<script id=buffered_amount type=module>
{
const state = await testing.async();
let results = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.addEventListener('open', () => {
results.push(ws.bufferedAmount); // Should be 0 initially
ws.send('test');
// bufferedAmount might be non-zero right after send
// but will go to 0 after message is sent
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
testing.expectEqual([0], results);
});
}
</script>
<script id=handler_properties type=module>
{
const state = await testing.async();
let received = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.onopen = () => {
ws.send('handler-test');
};
ws.onmessage = (e) => {
received.push(e.data);
ws.close();
};
ws.onclose = () => {
received.push('closed');
state.resolve();
};
await state.done(() => {
testing.expectEqual(['echo-handler-test', 'closed'], received);
});
}
</script>
<script id=binary_type type=module>
{
const state = await testing.async();
let results = [];
let ws = new WebSocket('ws://127.0.0.1:9584/');
ws.addEventListener('open', () => {
results.push(ws.binaryType); // Default is 'blob'
ws.binaryType = 'arraybuffer';
results.push(ws.binaryType);
ws.binaryType = 'blob';
results.push(ws.binaryType);
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
testing.expectEqual(['blob', 'arraybuffer', 'blob'], results);
});
}
</script>
<script id=url_property type=module>
{
const state = await testing.async();
let result = null;
let ws = new WebSocket('ws://127.0.0.1:9584/path');
ws.addEventListener('open', () => {
result = ws.url;
ws.close();
});
ws.addEventListener('close', () => {
state.resolve();
});
await state.done(() => {
testing.expectEqual('ws://127.0.0.1:9584/path', result);
});
}
</script>

View File

@@ -107,10 +107,7 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket {
const resolved_url = try URL.resolve(arena, page.base(), url, .{ .always_dupe = true, .encode = true }); const resolved_url = try URL.resolve(arena, page.base(), url, .{ .always_dupe = true, .encode = true });
const http_client = page._session.browser.http_client; const http_client = page._session.browser.http_client;
const conn = http_client.network.getConnection() orelse { const conn = http_client.network.newConnection() orelse {
// TODO: figure out how/where we actually want to get WebSocket connections
// from. I feel like sharing this with the HTTP Connection Pool is a
// mistake.
return error.NoFreeConnection; return error.NoFreeConnection;
}; };
@@ -135,7 +132,7 @@ pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket {
try http_client.trackConn(conn); try http_client.trackConn(conn);
if (comptime IS_DEBUG) { if (comptime IS_DEBUG) {
log.info(.http, "WS connecting", .{ .url = url }); log.info(.websocket, "connecting", .{ .url = url });
} }
// Unlike an XHR object where we only selectively reference the instance // Unlike an XHR object where we only selectively reference the instance
@@ -179,9 +176,9 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
self._ready_state = .closed; self._ready_state = .closed;
if (err_) |err| { if (err_) |err| {
log.warn(.http, "WS disconnected", .{ .err = err, .url = self._url }); log.warn(.websocket, "disconnected", .{ .err = err, .url = self._url });
} else { } else {
log.info(.http, "WS disconnected", .{ .url = self._url, .reason = "closed" }); log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" });
} }
self.cleanup(); self.cleanup();
@@ -191,7 +188,7 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
const reason = if (was_clean) self._close_reason else ""; const reason = if (was_clean) self._close_reason else "";
self.dispatchCloseEvent(code, reason, was_clean) catch |err| { self.dispatchCloseEvent(code, reason, was_clean) catch |err| {
log.err(.http, "WS close event dispatch failed", .{ .err = err }); log.err(.websocket, "close event dispatch failed", .{ .err = err });
}; };
} }
@@ -413,6 +410,7 @@ fn dispatchOpenEvent(self: *WebSocket) !void {
} }
fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void { fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void {
std.debug.print("{any} {s}\n", .{ frame_type, data });
const page = self._page; const page = self._page;
const target = self.asEventTarget(); const target = self.asEventTarget();
@@ -450,7 +448,7 @@ fn sendDataCallback(buffer: [*]u8, buf_count: usize, buf_len: usize, data: *anyo
} }
const conn: *http.Connection = @ptrCast(@alignCast(data)); const conn: *http.Connection = @ptrCast(@alignCast(data));
return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| { return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| {
log.warn(.http, "WS send callback", .{ .err = err }); log.warn(.websocket, "send callback", .{ .err = err });
return http.readfunc_pause; return http.readfunc_pause;
}; };
} }
@@ -499,6 +497,9 @@ fn _sendDataCallback(conn: *http.Connection, buf: []u8) !usize {
fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize { fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize {
if (self._send_offset == 0) { if (self._send_offset == 0) {
// start of the message // start of the message
if (comptime IS_DEBUG) {
log.debug(.websocket, "send start", .{ .url = self._url, .len = byte_msg.data.len });
}
try conn.wsStartFrame(frame_type, byte_msg.data.len); try conn.wsStartFrame(frame_type, byte_msg.data.len);
} }
@@ -511,6 +512,9 @@ fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: M
if (self._send_offset >= byte_msg.data.len) { if (self._send_offset >= byte_msg.data.len) {
const removed = self._send_queue.orderedRemove(0); const removed = self._send_queue.orderedRemove(0);
removed.deinit(self._page._session); removed.deinit(self._page._session);
if (comptime IS_DEBUG) {
log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len });
}
self._send_offset = 0; self._send_offset = 0;
} }
@@ -523,7 +527,7 @@ fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, d
} }
const conn: *http.Connection = @ptrCast(@alignCast(data)); const conn: *http.Connection = @ptrCast(@alignCast(data));
_receivedDataCallback(conn, buffer[0..buf_len]) catch |err| { _receivedDataCallback(conn, buffer[0..buf_len]) catch |err| {
log.warn(.http, "WS receive callback", .{ .err = err }); log.warn(.websocket, "receive callback", .{ .err = err });
// TODO: are there errors, like an invalid frame, that we shouldn't treat // TODO: are there errors, like an invalid frame, that we shouldn't treat
// as an error? // as an error?
return http.writefunc_error; return http.writefunc_error;
@@ -535,11 +539,14 @@ fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, d
fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
const self = conn.transport.websocket; const self = conn.transport.websocket;
const meta = conn.wsMeta() orelse { const meta = conn.wsMeta() orelse {
log.err(.http, "WS missing meta", .{ .url = self._url }); log.err(.websocket, "missing meta", .{ .url = self._url });
return error.NoFrameMeta; return error.NoFrameMeta;
}; };
if (meta.offset == 0) { if (meta.offset == 0) {
if (comptime IS_DEBUG) {
log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type });
}
// Start of new frame. Pre-allocate buffer // Start of new frame. Pre-allocate buffer
self._recv_buffer.clearRetainingCapacity(); self._recv_buffer.clearRetainingCapacity();
if (meta.len > self._http_client.max_response_size) { if (meta.len > self._http_client.max_response_size) {
@@ -598,10 +605,10 @@ fn receivedHeaderCalllback(buffer: [*]const u8, header_count: usize, buf_len: us
} }
self._ready_state = .open; self._ready_state = .open;
log.info(.http, "WS connected", .{ .url = self._url }); log.info(.websocket, "connected", .{ .url = self._url });
self.dispatchOpenEvent() catch |err| { self.dispatchOpenEvent() catch |err| {
log.err(.http, "WS open event fail", .{ .err = err }); log.err(.websocket, "open event fail", .{ .err = err });
}; };
return buf_len; return buf_len;
} }

View File

@@ -40,6 +40,7 @@ pub const Scope = enum {
unknown_prop, unknown_prop,
mcp, mcp,
cache, cache,
websocket,
}; };
const Opts = struct { const Opts = struct {

View File

@@ -61,6 +61,11 @@ connections: []http.Connection,
available: std.DoublyLinkedList = .{}, available: std.DoublyLinkedList = .{},
conn_mutex: std.Thread.Mutex = .{}, conn_mutex: std.Thread.Mutex = .{},
ws_pool: std.heap.MemoryPool(http.Connection),
ws_count: usize = 0,
ws_max: u8,
ws_mutex: std.Thread.Mutex = .{},
pollfds: []posix.pollfd, pollfds: []posix.pollfd,
listener: ?Listener = null, listener: ?Listener = null,
@@ -268,9 +273,13 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
.connections = connections, .connections = connections,
.app = app, .app = app,
.robot_store = RobotStore.init(allocator), .robot_store = RobotStore.init(allocator),
.web_bot_auth = web_bot_auth, .web_bot_auth = web_bot_auth,
.cache = cache, .cache = cache,
.ws_pool = .init(allocator),
.ws_max = config.wsMaxConcurrent(),
}; };
} }
@@ -298,6 +307,8 @@ pub fn deinit(self: *Network) void {
} }
self.allocator.free(self.connections); self.allocator.free(self.connections);
self.ws_pool.deinit();
self.robot_store.deinit(); self.robot_store.deinit();
if (self.web_bot_auth) |wba| { if (self.web_bot_auth) |wba| {
wba.deinit(self.allocator); wba.deinit(self.allocator);
@@ -592,18 +603,50 @@ pub fn getConnection(self: *Network) ?*http.Connection {
} }
pub fn releaseConnection(self: *Network, conn: *http.Connection) void { pub fn releaseConnection(self: *Network, conn: *http.Connection) void {
switch (conn.transport) {
.websocket => {
conn.deinit();
self.ws_mutex.lock();
defer self.ws_mutex.unlock();
self.ws_pool.destroy(conn);
self.ws_count -= 1;
},
else => {
conn.reset(self.config, self.ca_blob) catch |err| { conn.reset(self.config, self.ca_blob) catch |err| {
lp.assert(false, "couldn't reset curl easy", .{ .err = err }); lp.assert(false, "couldn't reset curl easy", .{ .err = err });
}; };
self.conn_mutex.lock(); self.conn_mutex.lock();
defer self.conn_mutex.unlock(); defer self.conn_mutex.unlock();
self.available.append(&conn.node); self.available.append(&conn.node);
},
}
} }
pub fn newConnection(self: *Network) !http.Connection { pub fn newConnection(self: *Network) ?*http.Connection {
return http.Connection.init(self.ca_blob, self.config); const conn = blk: {
self.ws_mutex.lock();
defer self.ws_mutex.unlock();
if (self.ws_count >= self.ws_max) {
return null;
}
const c = self.ws_pool.create() catch return null;
self.ws_count += 1;
break :blk c;
};
// don't do this under lock
conn.* = http.Connection.init(self.ca_blob, self.config) catch {
self.ws_mutex.lock();
defer self.ws_mutex.unlock();
self.ws_pool.destroy(conn);
self.ws_count -= 1;
return null;
};
return conn;
} }
// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is // Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is

View File

@@ -496,6 +496,7 @@ test "tests:beforeAll" {
.common = .{ .common = .{
.tls_verify_host = false, .tls_verify_host = false,
.user_agent_suffix = "internal-tester", .user_agent_suffix = "internal-tester",
.ws_max_concurrent = 50,
}, },
} }); } });