Merge pull request #1353 from lightpanda-io/wp/mrdimidium/multicontext

Use thread per connection
This commit is contained in:
Pierre Tachoire
2026-02-18 14:59:15 +01:00
committed by GitHub
15 changed files with 389 additions and 197 deletions

View File

@@ -122,8 +122,8 @@ jobs:
needs: zig-build-release needs: zig-build-release
env: env:
MAX_VmHWM: 26000 # 26MB (KB) MAX_VmHWM: 28000 # 28MB (KB)
MAX_CG_PEAK: 6000 # 6MB (KB) MAX_CG_PEAK: 8000 # 8MB (KB)
MAX_AVG_DURATION: 17 MAX_AVG_DURATION: 17
LIGHTPANDA_DISABLE_TELEMETRY: true LIGHTPANDA_DISABLE_TELEMETRY: true

View File

@@ -29,6 +29,7 @@ free_list_len: u16 = 0,
free_list: ?*Entry = null, free_list: ?*Entry = null,
free_list_max: u16, free_list_max: u16,
entry_pool: std.heap.MemoryPool(Entry), entry_pool: std.heap.MemoryPool(Entry),
mutex: std.Thread.Mutex = .{},
const Entry = struct { const Entry = struct {
next: ?*Entry, next: ?*Entry,
@@ -54,6 +55,9 @@ pub fn deinit(self: *ArenaPool) void {
} }
pub fn acquire(self: *ArenaPool) !Allocator { pub fn acquire(self: *ArenaPool) !Allocator {
self.mutex.lock();
defer self.mutex.unlock();
if (self.free_list) |entry| { if (self.free_list) |entry| {
self.free_list = entry.next; self.free_list = entry.next;
self.free_list_len -= 1; self.free_list_len -= 1;
@@ -73,6 +77,12 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void {
const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr));
const entry: *Entry = @fieldParentPtr("arena", arena); const entry: *Entry = @fieldParentPtr("arena", arena);
// Reset the arena before acquiring the lock to minimize lock hold time
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
self.mutex.lock();
defer self.mutex.unlock();
const free_list_len = self.free_list_len; const free_list_len = self.free_list_len;
if (free_list_len == self.free_list_max) { if (free_list_len == self.free_list_max) {
arena.deinit(); arena.deinit();
@@ -80,7 +90,6 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void {
return; return;
} }
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
entry.next = self.free_list; entry.next = self.free_list;
self.free_list_len = free_list_len + 1; self.free_list_len = free_list_len + 1;
self.free_list = entry; self.free_list = entry;

View File

@@ -30,6 +30,13 @@ pub const RunMode = enum {
version, version,
}; };
pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096;
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
pub const CDP_MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
mode: Mode, mode: Mode,
exec_name: []const u8, exec_name: []const u8,
http_headers: HttpHeaders, http_headers: HttpHeaders,
@@ -145,6 +152,20 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 {
}; };
} }
pub fn maxConnections(self: *const Config) u16 {
return switch (self.mode) {
.serve => |opts| opts.cdp_max_connections,
else => unreachable,
};
}
pub fn maxPendingConnections(self: *const Config) u31 {
return switch (self.mode) {
.serve => |opts| opts.cdp_max_pending_connections,
else => unreachable,
};
}
pub const Mode = union(RunMode) { pub const Mode = union(RunMode) {
help: bool, // false when being printed because of an error help: bool, // false when being printed because of an error
fetch: Fetch, fetch: Fetch,
@@ -156,10 +177,8 @@ pub const Serve = struct {
host: []const u8 = "127.0.0.1", host: []const u8 = "127.0.0.1",
port: u16 = 9222, port: u16 = 9222,
timeout: u31 = 10, timeout: u31 = 10,
max_connections: u16 = 16, cdp_max_connections: u16 = 16,
max_tabs_per_connection: u16 = 8, cdp_max_pending_connections: u16 = 128,
max_memory_per_tab: u64 = 512 * 1024 * 1024,
max_pending_connections: u16 = 128,
common: Common = .{}, common: Common = .{},
}; };
@@ -333,18 +352,11 @@ pub fn printUsageAndExit(self: *const Config, success: bool) void {
\\--timeout Inactivity timeout in seconds before disconnecting clients \\--timeout Inactivity timeout in seconds before disconnecting clients
\\ Defaults to 10 (seconds). Limited to 604800 (1 week). \\ Defaults to 10 (seconds). Limited to 604800 (1 week).
\\ \\
\\--max_connections \\--cdp_max_connections
\\ Maximum number of simultaneous CDP connections. \\ Maximum number of simultaneous CDP connections.
\\ Defaults to 16. \\ Defaults to 16.
\\ \\
\\--max_tabs Maximum number of tabs per CDP connection. \\--cdp_max_pending_connections
\\ Defaults to 8.
\\
\\--max_tab_memory
\\ Maximum memory per tab in bytes.
\\ Defaults to 536870912 (512 MB).
\\
\\--max_pending_connections
\\ Maximum pending connections in the accept queue. \\ Maximum pending connections in the accept queue.
\\ Defaults to 128. \\ Defaults to 128.
\\ \\
@@ -479,53 +491,27 @@ fn parseServeArgs(
continue; continue;
} }
if (std.mem.eql(u8, "--max_connections", opt)) { if (std.mem.eql(u8, "--cdp_max_connections", opt)) {
const str = args.next() orelse { const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_connections" }); log.fatal(.app, "missing argument value", .{ .arg = "--cdp_max_connections" });
return error.InvalidArgument; return error.InvalidArgument;
}; };
serve.max_connections = std.fmt.parseInt(u16, str, 10) catch |err| { serve.cdp_max_connections = std.fmt.parseInt(u16, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = "--max_connections", .err = err }); log.fatal(.app, "invalid argument value", .{ .arg = "--cdp_max_connections", .err = err });
return error.InvalidArgument; return error.InvalidArgument;
}; };
continue; continue;
} }
if (std.mem.eql(u8, "--max_tabs", opt)) { if (std.mem.eql(u8, "--cdp_max_pending_connections", opt)) {
const str = args.next() orelse { const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_tabs" }); log.fatal(.app, "missing argument value", .{ .arg = "--cdp_max_pending_connections" });
return error.InvalidArgument; return error.InvalidArgument;
}; };
serve.max_tabs_per_connection = std.fmt.parseInt(u16, str, 10) catch |err| { serve.cdp_max_pending_connections = std.fmt.parseInt(u16, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = "--max_tabs", .err = err }); log.fatal(.app, "invalid argument value", .{ .arg = "--cdp_max_pending_connections", .err = err });
return error.InvalidArgument;
};
continue;
}
if (std.mem.eql(u8, "--max_tab_memory", opt)) {
const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_tab_memory" });
return error.InvalidArgument;
};
serve.max_memory_per_tab = std.fmt.parseInt(u64, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = "--max_tab_memory", .err = err });
return error.InvalidArgument;
};
continue;
}
if (std.mem.eql(u8, "--max_pending_connections", opt)) {
const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_pending_connections" });
return error.InvalidArgument;
};
serve.max_pending_connections = std.fmt.parseInt(u16, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = "--max_pending_connections", .err = err });
return error.InvalidArgument; return error.InvalidArgument;
}; };
continue; continue;

View File

@@ -28,23 +28,25 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig"); const log = @import("log.zig");
const App = @import("App.zig"); const App = @import("App.zig");
const Config = @import("Config.zig");
const CDP = @import("cdp/cdp.zig").CDP; const CDP = @import("cdp/cdp.zig").CDP;
const Http = @import("http/Http.zig");
const MAX_HTTP_REQUEST_SIZE = 4096; const HttpClient = @import("http/Client.zig");
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
const Server = @This(); const Server = @This();
app: *App, app: *App,
shutdown: bool = false, shutdown: std.atomic.Value(bool) = .init(false),
allocator: Allocator, allocator: Allocator,
client: ?posix.socket_t,
listener: ?posix.socket_t, listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
// Thread management
active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayList(*Client) = .{},
client_mutex: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
pub fn init(app: *App, address: net.Address) !Server { pub fn init(app: *App, address: net.Address) !Server {
const allocator = app.allocator; const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address); const json_version_response = try buildJSONVersionResponse(allocator, address);
@@ -52,19 +54,28 @@ pub fn init(app: *App, address: net.Address) !Server {
return .{ return .{
.app = app, .app = app,
.client = null,
.listener = null, .listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
.clients_pool = std.heap.MemoryPool(Client).init(app.allocator),
}; };
} }
/// Interrupts the server so that main can complete normally and call all defer handlers. /// Interrupts the server so that main can complete normally and call all defer handlers.
pub fn stop(self: *Server) void { pub fn stop(self: *Server) void {
if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { if (self.shutdown.swap(true, .release)) {
return; return;
} }
// Shutdown all active clients
{
self.client_mutex.lock();
defer self.client_mutex.unlock();
for (self.clients.items) |client| {
client.stop();
}
}
// Linux and BSD/macOS handle canceling a socket blocked on accept differently. // Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF).
@@ -81,17 +92,22 @@ pub fn stop(self: *Server) void {
} }
pub fn deinit(self: *Server) void { pub fn deinit(self: *Server) void {
if (!self.shutdown.load(.acquire)) {
self.stop();
}
self.joinThreads();
if (self.listener) |listener| { if (self.listener) |listener| {
posix.close(listener); posix.close(listener);
self.listener = null; self.listener = null;
} }
// *if* server.run is running, we should really wait for it to return self.clients.deinit(self.allocator);
// before existing from here. self.clients_pool.deinit();
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
} }
pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener; self.listener = listener;
@@ -101,16 +117,20 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
try posix.bind(listener, &address.any, address.getOsSockLen()); try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1); try posix.listen(listener, self.app.config.maxPendingConnections());
log.info(.app, "server running", .{ .address = address }); log.info(.app, "server running", .{ .address = address });
while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { while (!self.shutdown.load(.acquire)) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) { switch (err) {
error.SocketNotListening, error.ConnectionAborted => { error.SocketNotListening, error.ConnectionAborted => {
log.info(.app, "server stopped", .{}); log.info(.app, "server stopped", .{});
break; break;
}, },
error.WouldBlock => {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
},
else => { else => {
log.err(.app, "CDP accept", .{ .err = err }); log.err(.app, "CDP accept", .{ .err = err });
std.Thread.sleep(std.time.ns_per_s); std.Thread.sleep(std.time.ns_per_s);
@@ -119,96 +139,121 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
}; };
self.client = socket; self.spawnWorker(socket, timeout_ms) catch |err| {
defer if (self.client) |s| { log.err(.app, "CDP spawn", .{ .err = err });
posix.close(s); posix.close(socket);
self.client = null;
};
if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
self.readLoop(socket, timeout_ms) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
}; };
} }
} }
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because defer posix.close(socket);
// it has a large read buffer. I don't know why, but v8 crashes if this
// is on the stack (and I assume it's related to its size).
const client = try self.allocator.create(Client);
defer self.allocator.destroy(client);
client.* = try Client.init(socket, self); // Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.getClient() catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer self.releaseClient(client);
client.* = Client.init(
socket,
self.allocator,
self.app,
self.json_version_response,
timeout_ms,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
return;
};
defer client.deinit(); defer client.deinit();
var http = &self.app.http; self.registerClient(client);
http.addCDPClient(.{ defer self.unregisterClient(client);
.socket = socket,
.ctx = client,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
});
defer http.removeCDPClient();
lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); // Check shutdown after registering to avoid missing stop() signal.
while (true) { // If stop() already iterated over clients, this client won't receive stop()
if (http.poll(timeout_ms) != .cdp_socket) { // and would block joinThreads() indefinitely.
log.info(.app, "CDP timeout", .{}); if (self.shutdown.load(.acquire)) {
return; return;
}
if (client.readSocket() == false) {
return;
}
if (client.mode == .cdp) {
break; // switch to our CDP loop
}
} }
var cdp = &client.mode.cdp; client.start();
var last_message = timestamp(.monotonic); }
var ms_remaining = timeout_ms;
while (true) { fn getClient(self: *Server) !*Client {
switch (cdp.pageWait(ms_remaining)) { self.client_mutex.lock();
.cdp_socket => { defer self.client_mutex.unlock();
if (client.readSocket() == false) { return self.clients_pool.create();
return; }
}
last_message = timestamp(.monotonic); fn releaseClient(self: *Server, client: *Client) void {
ms_remaining = timeout_ms; self.client_mutex.lock();
}, defer self.client_mutex.unlock();
.no_page => { self.clients_pool.destroy(client);
if (http.poll(ms_remaining) != .cdp_socket) { }
log.info(.app, "CDP timeout", .{});
return; fn registerClient(self: *Server, client: *Client) void {
} self.client_mutex.lock();
if (client.readSocket() == false) { defer self.client_mutex.unlock();
return; self.clients.append(self.allocator, client) catch {};
} }
last_message = timestamp(.monotonic);
ms_remaining = timeout_ms; fn unregisterClient(self: *Server, client: *Client) void {
}, self.client_mutex.lock();
.done => { defer self.client_mutex.unlock();
const elapsed = timestamp(.monotonic) - last_message; for (self.clients.items, 0..) |c, i| {
if (elapsed > ms_remaining) { if (c == client) {
log.info(.app, "CDP timeout", .{}); _ = self.clients.swapRemove(i);
return; break;
}
ms_remaining -= @intCast(elapsed);
},
} }
} }
} }
fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
// Atomically increment active_threads only if below max_connections.
// Uses CAS loop to avoid race between checking the limit and incrementing.
//
// cmpxchgWeak may fail for two reasons:
// 1. Another thread changed the value (increment or decrement)
// 2. Spurious failure on some architectures (e.g. ARM)
//
// We use Weak instead of Strong because we need a retry loop anyway:
// if CAS fails because a thread finished (counter decreased), we should
// retry rather than return an error - there may now be room for a new connection.
//
// On failure, cmpxchgWeak returns the actual value, which we reuse to avoid
// an extra load on the next iteration.
const max_connections = self.app.config.maxConnections();
var current = self.active_threads.load(.monotonic);
while (current < max_connections) {
current = self.active_threads.cmpxchgWeak(current, current + 1, .monotonic, .monotonic) orelse break;
} else {
return error.MaxThreadsReached;
}
errdefer _ = self.active_threads.fetchSub(1, .monotonic);
const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms });
thread.detach();
}
fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
handleConnection(self, socket, timeout_ms);
}
fn joinThreads(self: *Server) void {
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
// Handle exactly one TCP connection.
pub const Client = struct { pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances // The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections // should eventually be upgraded to a websocket connections
@@ -217,11 +262,15 @@ pub const Client = struct {
cdp: CDP, cdp: CDP,
}, },
server: *Server, allocator: Allocator,
app: *App,
http: *HttpClient,
json_version_response: []const u8,
reader: Reader(true), reader: Reader(true),
socket: posix.socket_t, socket: posix.socket_t,
socket_flags: usize, socket_flags: usize,
send_arena: ArenaAllocator, send_arena: ArenaAllocator,
timeout_ms: u32,
const EMPTY_PONG = [_]u8{ 138, 0 }; const EMPTY_PONG = [_]u8{ 138, 0 };
@@ -232,25 +281,49 @@ pub const Client = struct {
// "private-use" close codes must be from 4000-49999 // "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(socket: posix.socket_t, server: *Server) !Client { fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
timeout_ms: u32,
) !Client {
if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking // we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(server.allocator); var reader = try Reader(true).init(allocator);
errdefer reader.deinit(); errdefer reader.deinit();
const http = try app.http.createClient(allocator);
errdefer http.deinit();
return .{ return .{
.socket = socket, .socket = socket,
.server = server, .allocator = allocator,
.app = app,
.http = http,
.json_version_response = json_version_response,
.reader = reader, .reader = reader,
.mode = .{ .http = {} }, .mode = .{ .http = {} },
.socket_flags = socket_flags, .socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(server.allocator), .send_arena = ArenaAllocator.init(allocator),
.timeout_ms = timeout_ms,
}; };
} }
fn stop(self: *Client) void {
posix.shutdown(self.socket, .recv) catch {};
}
fn deinit(self: *Client) void { fn deinit(self: *Client) void {
switch (self.mode) { switch (self.mode) {
.cdp => |*cdp| cdp.deinit(), .cdp => |*cdp| cdp.deinit(),
@@ -258,6 +331,88 @@ pub const Client = struct {
} }
self.reader.deinit(); self.reader.deinit();
self.send_arena.deinit(); self.send_arena.deinit();
self.http.deinit();
}
fn start(self: *Client) void {
const http = self.http;
http.cdp_client = .{
.socket = self.socket,
.ctx = self,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
};
defer http.cdp_client = null;
self.httpLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
while (true) {
const status = http.tick(self.timeout_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (self.readSocket() == false) {
return;
}
if (self.mode == .cdp) {
break;
}
}
return self.cdpLoop(http);
}
fn cdpLoop(self: *Client, http: *HttpClient) !void {
var cdp = &self.mode.cdp;
var last_message = timestamp(.monotonic);
var ms_remaining = self.timeout_ms;
while (true) {
switch (cdp.pageWait(ms_remaining)) {
.cdp_socket => {
if (self.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = self.timeout_ms;
},
.no_page => {
const status = http.tick(ms_remaining) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (self.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = self.timeout_ms;
},
.done => {
const elapsed = timestamp(.monotonic) - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @intCast(elapsed);
},
}
}
} }
fn blockingReadStart(ctx: *anyopaque) bool { fn blockingReadStart(ctx: *anyopaque) bool {
@@ -314,7 +469,7 @@ pub const Client = struct {
lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len]; const request = self.reader.buf[0..self.reader.len];
if (request.len > MAX_HTTP_REQUEST_SIZE) { if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large"); self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge; return error.RequestTooLarge;
} }
@@ -367,7 +522,7 @@ pub const Client = struct {
} }
if (std.mem.eql(u8, url, "/json/version")) { if (std.mem.eql(u8, url, "/json/version")) {
try self.send(self.server.json_version_response); try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version // Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection. // then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the // Since we only allow 1 connection at a time, the 2nd one (the
@@ -472,7 +627,7 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) };
return self.send(response); return self.send(response);
} }
@@ -707,7 +862,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (message_len > 125) { if (message_len > 125) {
return error.ControlTooLarge; return error.ControlTooLarge;
} }
} else if (message_len > MAX_MESSAGE_SIZE) { } else if (message_len > Config.CDP_MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} else if (message_len > self.buf.len) { } else if (message_len > self.buf.len) {
const len = self.buf.len; const len = self.buf.len;
@@ -735,7 +890,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (is_continuation) { if (is_continuation) {
const fragments = &(self.fragments orelse return error.InvalidContinuation); const fragments = &(self.fragments orelse return error.InvalidContinuation);
if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { if (fragments.message.items.len + message_len > Config.CDP_MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} }

View File

@@ -24,9 +24,9 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const js = @import("js/js.zig"); const js = @import("js/js.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const HttpClient = @import("../http/Client.zig");
const ArenaPool = App.ArenaPool; const ArenaPool = App.ArenaPool;
const HttpClient = App.Http.Client;
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -47,6 +47,7 @@ http_client: *HttpClient,
const InitOpts = struct { const InitOpts = struct {
env: js.Env.InitOpts = .{}, env: js.Env.InitOpts = .{},
http_client: *HttpClient,
}; };
pub fn init(app: *App, opts: InitOpts) !Browser { pub fn init(app: *App, opts: InitOpts) !Browser {
@@ -61,7 +62,7 @@ pub fn init(app: *App, opts: InitOpts) !Browser {
.session = null, .session = null,
.allocator = allocator, .allocator = allocator,
.arena_pool = &app.arena_pool, .arena_pool = &app.arena_pool,
.http_client = app.http.client, .http_client = opts.http_client,
}; };
} }

View File

@@ -111,12 +111,16 @@ pub const RobotStore = struct {
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
map: RobotsMap, map: RobotsMap,
mutex: std.Thread.Mutex = .{},
pub fn init(allocator: std.mem.Allocator) RobotStore { pub fn init(allocator: std.mem.Allocator) RobotStore {
return .{ .allocator = allocator, .map = .empty }; return .{ .allocator = allocator, .map = .empty };
} }
pub fn deinit(self: *RobotStore) void { pub fn deinit(self: *RobotStore) void {
self.mutex.lock();
defer self.mutex.unlock();
var iter = self.map.iterator(); var iter = self.map.iterator();
while (iter.next()) |entry| { while (iter.next()) |entry| {
@@ -132,6 +136,9 @@ pub const RobotStore = struct {
} }
pub fn get(self: *RobotStore, url: []const u8) ?RobotsEntry { pub fn get(self: *RobotStore, url: []const u8) ?RobotsEntry {
self.mutex.lock();
defer self.mutex.unlock();
return self.map.get(url); return self.map.get(url);
} }
@@ -140,11 +147,17 @@ pub const RobotStore = struct {
} }
pub fn put(self: *RobotStore, url: []const u8, robots: Robots) !void { pub fn put(self: *RobotStore, url: []const u8, robots: Robots) !void {
self.mutex.lock();
defer self.mutex.unlock();
const duped = try self.allocator.dupe(u8, url); const duped = try self.allocator.dupe(u8, url);
try self.map.put(self.allocator, duped, .{ .present = robots }); try self.map.put(self.allocator, duped, .{ .present = robots });
} }
pub fn putAbsent(self: *RobotStore, url: []const u8) !void { pub fn putAbsent(self: *RobotStore, url: []const u8) !void {
self.mutex.lock();
defer self.mutex.unlock();
const duped = try self.allocator.dupe(u8, url); const duped = try self.allocator.dupe(u8, url);
try self.map.put(self.allocator, duped, .absent); try self.map.put(self.allocator, duped, .absent);
} }

View File

@@ -39,6 +39,14 @@ const JsApis = bridge.JsApis;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const IS_DEBUG = builtin.mode == .Debug; const IS_DEBUG = builtin.mode == .Debug;
fn initClassIds() void {
inline for (JsApis, 0..) |JsApi, i| {
JsApi.Meta.class_id = i;
}
}
var class_id_once = std.once(initClassIds);
// The Env maps to a V8 isolate, which represents a isolated sandbox for // The Env maps to a V8 isolate, which represents a isolated sandbox for
// executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings, // executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings,
// and it's where we'll start ExecutionWorlds, which actually execute JavaScript. // and it's where we'll start ExecutionWorlds, which actually execute JavaScript.
@@ -90,6 +98,9 @@ pub fn init(app: *App, opts: InitOpts) !Env {
} }
} }
// Initialize class IDs once before any V8 work
class_id_once.call();
const allocator = app.allocator; const allocator = app.allocator;
const snapshot = &app.snapshot; const snapshot = &app.snapshot;
@@ -132,8 +143,7 @@ pub fn init(app: *App, opts: InitOpts) !Env {
temp_scope.init(isolate); temp_scope.init(isolate);
defer temp_scope.deinit(); defer temp_scope.deinit();
inline for (JsApis, 0..) |JsApi, i| { inline for (JsApis, 0..) |_, i| {
JsApi.Meta.class_id = i;
const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i); const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i);
const function_handle: *const v8.FunctionTemplate = @ptrCast(data); const function_handle: *const v8.FunctionTemplate = @ptrCast(data);
// Make function template eternal // Make function template eternal

View File

@@ -205,7 +205,8 @@ fn httpShutdownCallback(ctx: *anyopaque) void {
var response = self._response; var response = self._response;
response._transfer = null; response._transfer = null;
response.deinit(true); response.deinit(true);
self._owns_response = false; // Do not access `self` after this point: the Fetch struct was
// allocated from response._arena which has been released.
} }
} }

View File

@@ -28,6 +28,7 @@ const js = @import("../browser/js/js.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const Browser = @import("../browser/Browser.zig"); const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig"); const Session = @import("../browser/Session.zig");
const HttpClient = @import("../http/Client.zig");
const Page = @import("../browser/Page.zig"); const Page = @import("../browser/Page.zig");
const Incrementing = @import("../id.zig").Incrementing; const Incrementing = @import("../id.zig").Incrementing;
const Notification = @import("../Notification.zig"); const Notification = @import("../Notification.zig");
@@ -84,10 +85,11 @@ pub fn CDPT(comptime TypeProvider: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(app: *App, http_client: *HttpClient, client: TypeProvider.Client) !Self {
const allocator = app.allocator; const allocator = app.allocator;
const browser = try Browser.init(app, .{ const browser = try Browser.init(app, .{
.env = .{ .with_inspector = true }, .env = .{ .with_inspector = true },
.http_client = http_client,
}); });
errdefer browser.deinit(); errdefer browser.deinit();

View File

@@ -85,7 +85,7 @@ const TestContext = struct {
self.client = Client.init(self.arena.allocator()); self.client = Client.init(self.arena.allocator());
// Don't use the arena here. We want to detect leaks in CDP. // Don't use the arena here. We want to detect leaks in CDP.
// The arena is only for test-specific stuff // The arena is only for test-specific stuff
self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; self.cdp_ = TestCDP.init(base.test_app, base.test_http, &self.client.?) catch unreachable;
} }
return &self.cdp_.?; return &self.cdp_.?;
} }

View File

@@ -17,8 +17,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
pub const c = @cImport({ pub const c = @cImport({
@cInclude("curl/curl.h"); @cInclude("curl/curl.h");
@@ -28,6 +26,8 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig"); pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer; pub const Transfer = Client.Transfer;
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const errors = @import("errors.zig"); const errors = @import("errors.zig");
const RobotStore = @import("../browser/Robots.zig").RobotStore; const RobotStore = @import("../browser/Robots.zig").RobotStore;
@@ -42,10 +42,11 @@ const ArenaAllocator = std.heap.ArenaAllocator;
// once for all http connections is a win. // once for all http connections is a win.
const Http = @This(); const Http = @This();
config: *const Config,
client: *Client,
ca_blob: ?c.curl_blob,
arena: ArenaAllocator, arena: ArenaAllocator,
allocator: Allocator,
config: *const Config,
ca_blob: ?c.curl_blob,
robot_store: *RobotStore,
pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http { pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http {
try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL));
@@ -60,40 +61,29 @@ pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Confi
var ca_blob: ?c.curl_blob = null; var ca_blob: ?c.curl_blob = null;
if (config.tlsVerifyHost()) { if (config.tlsVerifyHost()) {
ca_blob = try loadCerts(allocator, arena.allocator()); ca_blob = try loadCerts(allocator);
} }
var client = try Client.init(allocator, ca_blob, robot_store, config);
errdefer client.deinit();
return .{ return .{
.arena = arena, .arena = arena,
.client = client, .allocator = allocator,
.ca_blob = ca_blob,
.config = config, .config = config,
.ca_blob = ca_blob,
.robot_store = robot_store,
}; };
} }
pub fn deinit(self: *Http) void { pub fn deinit(self: *Http) void {
self.client.deinit(); if (self.ca_blob) |ca_blob| {
const data: [*]u8 = @ptrCast(ca_blob.data);
self.allocator.free(data[0..ca_blob.len]);
}
c.curl_global_cleanup(); c.curl_global_cleanup();
self.arena.deinit(); self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { pub fn createClient(self: *Http, allocator: Allocator) !*Client {
return self.client.tick(timeout_ms) catch |err| { return Client.init(allocator, self.ca_blob, self.robot_store, self.config);
log.err(.app, "http poll", .{ .err = err });
return .normal;
};
}
pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void {
lp.assert(self.client.cdp_client == null, "Http addCDPClient existing", .{});
self.client.cdp_client = cdp_client;
}
pub fn removeCDPClient(self: *Http) void {
self.client.cdp_client = null;
} }
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
@@ -351,7 +341,7 @@ pub const Method = enum(u8) {
// This whole rescan + decode is really just needed for MacOS. On Linux // This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different // bundle.rescan does find the .pem file(s) which could be in a few different
// places, so it's still useful, just not efficient. // places, so it's still useful, just not efficient.
fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { fn loadCerts(allocator: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{}; var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator); try bundle.rescan(allocator);
defer bundle.deinit(allocator); defer bundle.deinit(allocator);
@@ -374,8 +364,9 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
(bundle.map.count() * 75) + // start / end per certificate + extra, just in case (bundle.map.count() * 75) + // start / end per certificate + extra, just in case
(encoded_size / 64) // newline per 64 characters (encoded_size / 64) // newline per 64 characters
; ;
try arr.ensureTotalCapacity(arena, buffer_size); try arr.ensureTotalCapacity(allocator, buffer_size);
var writer = arr.writer(arena); errdefer arr.deinit(allocator);
var writer = arr.writer(allocator);
var it = bundle.map.valueIterator(); var it = bundle.map.valueIterator();
while (it.next()) |index| { while (it.next()) |index| {
@@ -388,11 +379,16 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
} }
// Final encoding should not be larger than our initial size estimate // Final encoding should not be larger than our initial size estimate
lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len });
// Allocate exactly the size needed and copy the data
const result = try allocator.dupe(u8, arr.items);
// Free the original oversized allocation
arr.deinit(allocator);
return .{ return .{
.len = arr.items.len, .len = result.len,
.data = arr.items.ptr, .data = result.ptr,
.flags = 0, .flags = 0,
}; };
} }

View File

@@ -39,10 +39,13 @@ pub const FetchOpts = struct {
writer: ?*std.Io.Writer = null, writer: ?*std.Io.Writer = null,
}; };
pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void {
const http_client = try app.http.createClient(app.allocator);
defer http_client.deinit();
const notification = try Notification.init(app.allocator); const notification = try Notification.init(app.allocator);
defer notification.deinit(); defer notification.deinit();
var browser = try Browser.init(app, .{}); var browser = try Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
var session = try browser.newSession(notification); var session = try browser.newSession(notification);

View File

@@ -46,17 +46,24 @@ pub fn main() !void {
var test_arena = std.heap.ArenaAllocator.init(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit(); defer test_arena.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
const notification = try lp.Notification.init(app.allocator); defer http_client.deinit();
defer notification.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
const notification = try lp.Notification.init(allocator);
defer notification.deinit();
const session = try browser.newSession(notification); const session = try browser.newSession(notification);
defer session.deinit();
var dir = try std.fs.cwd().openDir("src/browser/tests/legacy/", .{ .iterate = true, .no_follow = true }); var dir = try std.fs.cwd().openDir("src/browser/tests/legacy/", .{ .iterate = true, .no_follow = true });
defer dir.close(); defer dir.close();
var walker = try dir.walk(allocator); var walker = try dir.walk(allocator);
defer walker.deinit(); defer walker.deinit();
while (try walker.next()) |entry| { while (try walker.next()) |entry| {
_ = test_arena.reset(.retain_capacity); _ = test_arena.reset(.retain_capacity);
if (entry.kind != .file) { if (entry.kind != .file) {

View File

@@ -69,7 +69,10 @@ pub fn main() !void {
var app = try lp.App.init(allocator, &config); var app = try lp.App.init(allocator, &config);
defer app.deinit(); defer app.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
defer http_client.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
// An arena for running each tests. Is reset after every test. // An arena for running each tests. Is reset after every test.

View File

@@ -39,6 +39,7 @@ pub fn reset() void {
const App = @import("App.zig"); const App = @import("App.zig");
const js = @import("browser/js/js.zig"); const js = @import("browser/js/js.zig");
const Config = @import("Config.zig"); const Config = @import("Config.zig");
const Client = @import("http/Client.zig");
const Page = @import("browser/Page.zig"); const Page = @import("browser/Page.zig");
const Browser = @import("browser/Browser.zig"); const Browser = @import("browser/Browser.zig");
const Session = @import("browser/Session.zig"); const Session = @import("browser/Session.zig");
@@ -334,6 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool {
} }
pub var test_app: *App = undefined; pub var test_app: *App = undefined;
pub var test_http: *Client = undefined;
pub var test_browser: Browser = undefined; pub var test_browser: Browser = undefined;
pub var test_notification: *Notification = undefined; pub var test_notification: *Notification = undefined;
pub var test_session: *Session = undefined; pub var test_session: *Session = undefined;
@@ -472,7 +474,10 @@ test "tests:beforeAll" {
test_app = try App.init(test_allocator, &test_config); test_app = try App.init(test_allocator, &test_config);
errdefer test_app.deinit(); errdefer test_app.deinit();
test_browser = try Browser.init(test_app, .{}); test_http = try test_app.http.createClient(test_allocator);
errdefer test_http.deinit();
test_browser = try Browser.init(test_app, .{ .http_client = test_http });
errdefer test_browser.deinit(); errdefer test_browser.deinit();
// Create notification for testing // Create notification for testing
@@ -519,6 +524,7 @@ test "tests:afterAll" {
test_notification.deinit(); test_notification.deinit();
test_browser.deinit(); test_browser.deinit();
test_http.deinit();
test_app.deinit(); test_app.deinit();
test_config.deinit(@import("root").tracking_allocator); test_config.deinit(@import("root").tracking_allocator);
} }