Implement multi-cdp architecture

This commit is contained in:
Nikolay Govorov
2026-02-04 16:25:43 +00:00
parent babf8ba3e7
commit e3b631ebb3
17 changed files with 379 additions and 188 deletions

View File

@@ -189,7 +189,7 @@ fn addDependencies(
.prebuilt_v8_path = prebuilt_v8_path, .prebuilt_v8_path = prebuilt_v8_path,
.is_asan = is_asan, .is_asan = is_asan,
.is_tsan = is_tsan, .is_tsan = is_tsan,
.v8_enable_sandbox = is_tsan, .v8_enable_sandbox = is_tsan, // v8 contains a bug and cannot be compiled with tsan without a sandbox.
}; };
mod.addIncludePath(b.path("vendor/lightpanda")); mod.addIncludePath(b.path("vendor/lightpanda"));

View File

@@ -29,6 +29,7 @@ free_list_len: u16 = 0,
free_list: ?*Entry = null, free_list: ?*Entry = null,
free_list_max: u16, free_list_max: u16,
entry_pool: std.heap.MemoryPool(Entry), entry_pool: std.heap.MemoryPool(Entry),
mutex: std.Thread.Mutex,
const Entry = struct { const Entry = struct {
next: ?*Entry, next: ?*Entry,
@@ -41,6 +42,7 @@ pub fn init(allocator: Allocator) ArenaPool {
.free_list_max = 512, // TODO make configurable .free_list_max = 512, // TODO make configurable
.retain_bytes = 1024 * 16, // TODO make configurable .retain_bytes = 1024 * 16, // TODO make configurable
.entry_pool = std.heap.MemoryPool(Entry).init(allocator), .entry_pool = std.heap.MemoryPool(Entry).init(allocator),
.mutex = .{},
}; };
} }
@@ -54,6 +56,9 @@ pub fn deinit(self: *ArenaPool) void {
} }
pub fn acquire(self: *ArenaPool) !Allocator { pub fn acquire(self: *ArenaPool) !Allocator {
self.mutex.lock();
defer self.mutex.unlock();
if (self.free_list) |entry| { if (self.free_list) |entry| {
self.free_list = entry.next; self.free_list = entry.next;
self.free_list_len -= 1; self.free_list_len -= 1;
@@ -73,6 +78,12 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void {
const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr));
const entry: *Entry = @fieldParentPtr("arena", arena); const entry: *Entry = @fieldParentPtr("arena", arena);
// Reset the arena before acquiring the lock to minimize lock hold time
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
self.mutex.lock();
defer self.mutex.unlock();
const free_list_len = self.free_list_len; const free_list_len = self.free_list_len;
if (free_list_len == self.free_list_max) { if (free_list_len == self.free_list_max) {
arena.deinit(); arena.deinit();
@@ -80,7 +91,6 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void {
return; return;
} }
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
entry.next = self.free_list; entry.next = self.free_list;
self.free_list_len = free_list_len + 1; self.free_list_len = free_list_len + 1;
self.free_list = entry; self.free_list = entry;

View File

@@ -30,6 +30,13 @@ pub const RunMode = enum {
version, version,
}; };
pub const MAX_HTTP_REQUEST_SIZE = 4096;
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
pub const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
mode: Mode, mode: Mode,
exec_name: []const u8, exec_name: []const u8,
http_headers: HttpHeaders, http_headers: HttpHeaders,
@@ -131,6 +138,20 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 {
}; };
} }
pub fn maxConnections(self: *const Config) u16 {
return switch (self.mode) {
.serve => |opts| opts.max_connections,
else => unreachable,
};
}
pub fn maxPendingConnections(self: *const Config) u31 {
return switch (self.mode) {
.serve => |opts| opts.max_pending_connections,
else => unreachable,
};
}
pub const Mode = union(RunMode) { pub const Mode = union(RunMode) {
help: bool, // false when being printed because of an error help: bool, // false when being printed because of an error
fetch: Fetch, fetch: Fetch,
@@ -144,7 +165,6 @@ pub const Serve = struct {
timeout: u31 = 10, timeout: u31 = 10,
max_connections: u16 = 16, max_connections: u16 = 16,
max_tabs_per_connection: u16 = 8, max_tabs_per_connection: u16 = 8,
max_memory_per_tab: u64 = 512 * 1024 * 1024,
max_pending_connections: u16 = 128, max_pending_connections: u16 = 128,
common: Common = .{}, common: Common = .{},
}; };
@@ -479,19 +499,6 @@ fn parseServeArgs(
continue; continue;
} }
if (std.mem.eql(u8, "--max_tab_memory", opt)) {
const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_tab_memory" });
return error.InvalidArgument;
};
serve.max_memory_per_tab = std.fmt.parseInt(u64, str, 10) catch |err| {
log.fatal(.app, "invalid argument value", .{ .arg = "--max_tab_memory", .err = err });
return error.InvalidArgument;
};
continue;
}
if (std.mem.eql(u8, "--max_pending_connections", opt)) { if (std.mem.eql(u8, "--max_pending_connections", opt)) {
const str = args.next() orelse { const str = args.next() orelse {
log.fatal(.app, "missing argument value", .{ .arg = "--max_pending_connections" }); log.fatal(.app, "missing argument value", .{ .arg = "--max_pending_connections" });

View File

@@ -28,43 +28,51 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig"); const log = @import("log.zig");
const App = @import("App.zig"); const App = @import("App.zig");
const Config = @import("Config.zig");
const CDP = @import("cdp/cdp.zig").CDP; const CDP = @import("cdp/cdp.zig").CDP;
const Http = @import("http/Http.zig");
const MAX_HTTP_REQUEST_SIZE = 4096; const HttpClient = @import("http/Client.zig");
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
const Server = @This(); const Server = @This();
app: *App, app: *App,
shutdown: bool = false, shutdown: std.atomic.Value(bool) = .init(false),
allocator: Allocator, allocator: Allocator,
client: ?posix.socket_t,
listener: ?posix.socket_t, listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
pub fn init(app: *App, address: net.Address) !Server { // Thread management
const allocator = app.allocator; active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayListUnmanaged(*Client) = .{},
clients_mu: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server {
const json_version_response = try buildJSONVersionResponse(allocator, address); const json_version_response = try buildJSONVersionResponse(allocator, address);
errdefer allocator.free(json_version_response); errdefer allocator.free(json_version_response);
return .{ return .{
.app = app, .app = app,
.client = null,
.listener = null, .listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
.clients_pool = std.heap.MemoryPool(Client).init(allocator),
}; };
} }
/// Interrupts the server so that main can complete normally and call all defer handlers. /// Interrupts the server so that main can complete normally and call all defer handlers.
pub fn stop(self: *Server) void { pub fn stop(self: *Server) void {
if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { if (self.shutdown.swap(true, .release)) {
return; return;
} }
// Shutdown all active clients
self.clients_mu.lock();
for (self.clients.items) |client| {
client.stop();
}
self.clients_mu.unlock();
// Linux and BSD/macOS handle canceling a socket blocked on accept differently. // Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF).
@@ -81,17 +89,18 @@ pub fn stop(self: *Server) void {
} }
pub fn deinit(self: *Server) void { pub fn deinit(self: *Server) void {
self.joinThreads();
if (self.listener) |listener| { if (self.listener) |listener| {
posix.close(listener); posix.close(listener);
self.listener = null; self.listener = null;
} }
// *if* server.run is running, we should really wait for it to return self.clients.deinit(self.allocator);
// before existing from here. self.clients_pool.deinit();
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
} }
pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener; self.listener = listener;
@@ -101,16 +110,20 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
try posix.bind(listener, &address.any, address.getOsSockLen()); try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1); try posix.listen(listener, self.app.config.maxPendingConnections());
log.info(.app, "server running", .{ .address = address }); log.info(.app, "server running", .{ .address = address });
while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { while (!self.shutdown.load(.acquire)) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) { switch (err) {
error.SocketNotListening, error.ConnectionAborted => { error.SocketNotListening, error.ConnectionAborted => {
log.info(.app, "server stopped", .{}); log.info(.app, "server stopped", .{});
break; break;
}, },
error.WouldBlock => {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
},
else => { else => {
log.err(.app, "CDP accept", .{ .err = err }); log.err(.app, "CDP accept", .{ .err = err });
std.Thread.sleep(std.time.ns_per_s); std.Thread.sleep(std.time.ns_per_s);
@@ -119,97 +132,99 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
}; };
self.client = socket; self.spawnWorker(socket, timeout_ms) catch |err| {
defer if (self.client) |s| { log.err(.app, "CDP spawn", .{ .err = err });
posix.close(s); posix.close(socket);
self.client = null;
};
if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
self.readLoop(socket, timeout_ms) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
}; };
} }
} }
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because defer posix.close(socket);
// it has a large read buffer. I don't know why, but v8 crashes if this
// is on the stack (and I assume it's related to its size).
const client = try self.allocator.create(Client);
defer self.allocator.destroy(client);
client.* = try Client.init(socket, self); // Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.getClient() catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer self.releaseClient(client);
client.* = Client.init(
socket,
self.allocator,
self.app,
self.json_version_response,
timeout_ms,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
return;
};
defer client.deinit(); defer client.deinit();
var http = &self.app.http; self.registerClient(client);
http.addCDPClient(.{ defer self.unregisterClient(client);
.socket = socket,
.ctx = client,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
});
defer http.removeCDPClient();
lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); client.start();
while (true) { }
if (http.poll(timeout_ms) != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (client.readSocket() == false) { fn getClient(self: *Server) !*Client {
return; self.clients_mu.lock();
} defer self.clients_mu.unlock();
return self.clients_pool.create();
}
if (client.mode == .cdp) { fn releaseClient(self: *Server, client: *Client) void {
break; // switch to our CDP loop self.clients_mu.lock();
} defer self.clients_mu.unlock();
} self.clients_pool.destroy(client);
}
var cdp = &client.mode.cdp; fn registerClient(self: *Server, client: *Client) void {
var last_message = timestamp(.monotonic); self.clients_mu.lock();
var ms_remaining = timeout_ms; defer self.clients_mu.unlock();
while (true) { self.clients.append(self.allocator, client) catch {};
switch (cdp.pageWait(ms_remaining)) { }
.cdp_socket => {
if (client.readSocket() == false) { fn unregisterClient(self: *Server, client: *Client) void {
return; self.clients_mu.lock();
} defer self.clients_mu.unlock();
last_message = timestamp(.monotonic); for (self.clients.items, 0..) |c, i| {
ms_remaining = timeout_ms; if (c == client) {
}, _ = self.clients.swapRemove(i);
.no_page => { break;
if (http.poll(ms_remaining) != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (client.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = timeout_ms;
},
.done => {
const elapsed = timestamp(.monotonic) - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @intCast(elapsed);
},
.navigate => unreachable, // must have been handled by the session
} }
} }
} }
fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
if (self.active_threads.load(.monotonic) >= self.app.config.maxConnections()) {
return error.MaxThreadsReached;
}
_ = self.active_threads.fetchAdd(1, .monotonic);
errdefer _ = self.active_threads.fetchSub(1, .monotonic);
const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms });
thread.detach();
}
fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
handleConnection(self, socket, timeout_ms);
}
fn joinThreads(self: *Server) void {
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
// Handle exactly one TCP connection.
pub const Client = struct { pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances // The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections // should eventually be upgraded to a websocket connections
@@ -218,11 +233,15 @@ pub const Client = struct {
cdp: CDP, cdp: CDP,
}, },
server: *Server, allocator: Allocator,
app: *App,
http: *HttpClient,
json_version_response: []const u8,
reader: Reader(true), reader: Reader(true),
socket: posix.socket_t, socket: posix.socket_t,
socket_flags: usize, socket_flags: usize,
send_arena: ArenaAllocator, send_arena: ArenaAllocator,
timeout_ms: u32,
const EMPTY_PONG = [_]u8{ 138, 0 }; const EMPTY_PONG = [_]u8{ 138, 0 };
@@ -233,22 +252,42 @@ pub const Client = struct {
// "private-use" close codes must be from 4000-49999 // "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(socket: posix.socket_t, server: *Server) !Client { fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
timeout_ms: u32,
) !Client {
if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address);
try std.posix.getsockname(socket, &client_address.any, &socklen);
log.info(.app, "client connected", .{ .ip = client_address });
}
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking // we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(server.allocator); var reader = try Reader(true).init(allocator);
errdefer reader.deinit(); errdefer reader.deinit();
const http = try app.http.createClient(allocator);
errdefer http.deinit();
return .{ return .{
.socket = socket, .socket = socket,
.server = server, .allocator = allocator,
.app = app,
.http = http,
.json_version_response = json_version_response,
.reader = reader, .reader = reader,
.mode = .{ .http = {} }, .mode = .{ .http = {} },
.socket_flags = socket_flags, .socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(server.allocator), .send_arena = ArenaAllocator.init(allocator),
.timeout_ms = timeout_ms,
}; };
} }
@@ -259,6 +298,93 @@ pub const Client = struct {
} }
self.reader.deinit(); self.reader.deinit();
self.send_arena.deinit(); self.send_arena.deinit();
self.http.deinit();
}
fn start(self: *Client) void {
const http = self.http;
http.cdp_client = .{
.socket = self.socket,
.ctx = self,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
};
defer http.cdp_client = null;
self.httpLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn stop(self: *Client) void {
posix.shutdown(self.socket, .recv) catch {};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
while (true) {
const status = http.tick(self.timeout_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (self.readSocket() == false) {
return;
}
if (self.mode == .cdp) {
break;
}
}
return self.cdpLoop(http);
}
fn cdpLoop(self: *Client, http: *HttpClient) !void {
var cdp = &self.mode.cdp;
var last_message = timestamp(.monotonic);
var ms_remaining = self.timeout_ms;
while (true) {
switch (cdp.pageWait(ms_remaining)) {
.cdp_socket => {
if (self.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = self.timeout_ms;
},
.no_page => {
const status = http.tick(ms_remaining) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (self.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = self.timeout_ms;
},
.done => {
const elapsed = timestamp(.monotonic) - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @intCast(elapsed);
},
.navigate => unreachable, // must have been handled by the session
}
}
} }
fn blockingReadStart(ctx: *anyopaque) bool { fn blockingReadStart(ctx: *anyopaque) bool {
@@ -315,7 +441,7 @@ pub const Client = struct {
lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len]; const request = self.reader.buf[0..self.reader.len];
if (request.len > MAX_HTTP_REQUEST_SIZE) { if (request.len > Config.MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large"); self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge; return error.RequestTooLarge;
} }
@@ -368,7 +494,7 @@ pub const Client = struct {
} }
if (std.mem.eql(u8, url, "/json/version")) { if (std.mem.eql(u8, url, "/json/version")) {
try self.send(self.server.json_version_response); try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version // Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection. // then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the // Since we only allow 1 connection at a time, the 2nd one (the
@@ -473,7 +599,7 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; self.mode = .{ .cdp = try CDP.init(self.app, self.http, self) };
return self.send(response); return self.send(response);
} }
@@ -708,7 +834,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (message_len > 125) { if (message_len > 125) {
return error.ControlTooLarge; return error.ControlTooLarge;
} }
} else if (message_len > MAX_MESSAGE_SIZE) { } else if (message_len > Config.MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} else if (message_len > self.buf.len) { } else if (message_len > self.buf.len) {
const len = self.buf.len; const len = self.buf.len;
@@ -736,7 +862,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (is_continuation) { if (is_continuation) {
const fragments = &(self.fragments orelse return error.InvalidContinuation); const fragments = &(self.fragments orelse return error.InvalidContinuation);
if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { if (fragments.message.items.len + message_len > Config.MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} }

View File

@@ -56,9 +56,9 @@ pub fn run(self: *TestHTTPServer, wg: *std.Thread.WaitGroup) !void {
wg.finish(); wg.finish();
while (true) { while (!self.shutdown.load(.acquire)) {
const conn = listener.accept() catch |err| { const conn = listener.accept() catch |err| {
if (self.shutdown.load(.acquire) or err == error.SocketNotListening) { if (err == error.SocketNotListening) {
return; return;
} }
return err; return err;

View File

@@ -24,9 +24,9 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const js = @import("js/js.zig"); const js = @import("js/js.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const HttpClient = @import("../http/Client.zig");
const ArenaPool = App.ArenaPool; const ArenaPool = App.ArenaPool;
const HttpClient = App.Http.Client;
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -51,6 +51,7 @@ transfer_arena: ArenaAllocator,
const InitOpts = struct { const InitOpts = struct {
env: js.Env.InitOpts = .{}, env: js.Env.InitOpts = .{},
http_client: *HttpClient,
}; };
pub fn init(app: *App, opts: InitOpts) !Browser { pub fn init(app: *App, opts: InitOpts) !Browser {
@@ -65,7 +66,7 @@ pub fn init(app: *App, opts: InitOpts) !Browser {
.session = null, .session = null,
.allocator = allocator, .allocator = allocator,
.arena_pool = &app.arena_pool, .arena_pool = &app.arena_pool,
.http_client = app.http.client, .http_client = opts.http_client,
.call_arena = ArenaAllocator.init(allocator), .call_arena = ArenaAllocator.init(allocator),
.page_arena = ArenaAllocator.init(allocator), .page_arena = ArenaAllocator.init(allocator),
.session_arena = ArenaAllocator.init(allocator), .session_arena = ArenaAllocator.init(allocator),

View File

@@ -37,6 +37,14 @@ const JsApis = bridge.JsApis;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
fn initClassIds() void {
inline for (JsApis, 0..) |JsApi, i| {
JsApi.Meta.class_id = i;
}
}
var class_id_once = std.once(initClassIds);
// The Env maps to a V8 isolate, which represents a isolated sandbox for // The Env maps to a V8 isolate, which represents a isolated sandbox for
// executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings, // executing JavaScript. The Env is where we'll define our V8 <-> Zig bindings,
// and it's where we'll start ExecutionWorlds, which actually execute JavaScript. // and it's where we'll start ExecutionWorlds, which actually execute JavaScript.
@@ -76,6 +84,9 @@ pub const InitOpts = struct {
}; };
pub fn init(app: *App, opts: InitOpts) !Env { pub fn init(app: *App, opts: InitOpts) !Env {
// Initialize class IDs once before any V8 work
class_id_once.call();
const allocator = app.allocator; const allocator = app.allocator;
const snapshot = &app.snapshot; const snapshot = &app.snapshot;
@@ -117,8 +128,7 @@ pub fn init(app: *App, opts: InitOpts) !Env {
temp_scope.init(isolate); temp_scope.init(isolate);
defer temp_scope.deinit(); defer temp_scope.deinit();
inline for (JsApis, 0..) |JsApi, i| { inline for (JsApis, 0..) |_, i| {
JsApi.Meta.class_id = i;
const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i); const data = v8.v8__Isolate__GetDataFromSnapshotOnce(isolate_handle, snapshot.data_start + i);
const function_handle: *const v8.FunctionTemplate = @ptrCast(data); const function_handle: *const v8.FunctionTemplate = @ptrCast(data);
// Make function template eternal // Make function template eternal

View File

@@ -38,11 +38,12 @@ const IS_DEBUG = @import("builtin").mode == .Debug;
const Inspector = @This(); const Inspector = @This();
unique_id: i64, unique_id: i64,
allocator: Allocator,
isolate: *v8.Isolate, isolate: *v8.Isolate,
handle: *v8.Inspector, handle: *v8.Inspector,
client: *v8.InspectorClientImpl, client: *v8.InspectorClientImpl,
default_context: ?v8.Global, default_context: ?v8.Global,
session: ?Session, sessions: std.ArrayListUnmanaged(*Session),
pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector { pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector {
const self = try allocator.create(Inspector); const self = try allocator.create(Inspector);
@@ -50,7 +51,8 @@ pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector {
self.* = .{ self.* = .{
.unique_id = 1, .unique_id = 1,
.session = null, .allocator = allocator,
.sessions = .empty,
.isolate = isolate, .isolate = isolate,
.client = undefined, .client = undefined,
.handle = undefined, .handle = undefined,
@@ -67,32 +69,43 @@ pub fn init(allocator: Allocator, isolate: *v8.Isolate) !*Inspector {
return self; return self;
} }
pub fn deinit(self: *const Inspector, allocator: Allocator) void { pub fn deinit(self: *Inspector, allocator: Allocator) void {
var hs: v8.HandleScope = undefined; var hs: v8.HandleScope = undefined;
v8.v8__HandleScope__CONSTRUCT(&hs, self.isolate); v8.v8__HandleScope__CONSTRUCT(&hs, self.isolate);
defer v8.v8__HandleScope__DESTRUCT(&hs); defer v8.v8__HandleScope__DESTRUCT(&hs);
if (self.session) |*s| { for (self.sessions.items) |session| {
s.deinit(); session.deinit();
self.allocator.destroy(session);
} }
self.sessions.deinit(self.allocator);
v8.v8_inspector__Client__IMPL__DELETE(self.client); v8.v8_inspector__Client__IMPL__DELETE(self.client);
v8.v8_inspector__Inspector__DELETE(self.handle); v8.v8_inspector__Inspector__DELETE(self.handle);
allocator.destroy(self); allocator.destroy(self);
} }
pub fn startSession(self: *Inspector, ctx: anytype) *Session { pub fn startSession(self: *Inspector, ctx: anytype) !*Session {
if (comptime IS_DEBUG) { const session = try self.allocator.create(Session);
std.debug.assert(self.session == null); errdefer self.allocator.destroy(session);
}
self.session = @as(Session, undefined); Session.init(session, self, ctx);
Session.init(&self.session.?, self, ctx); try self.sessions.append(self.allocator, session);
return &self.session.?; return session;
} }
pub fn stopSession(self: *Inspector) void { pub fn stopSession(self: *Inspector, session: *Session) void {
self.session.?.deinit(); for (self.sessions.items, 0..) |s, i| {
self.session = null; if (s == session) {
_ = self.sessions.swapRemove(i);
session.deinit();
self.allocator.destroy(session);
return;
}
}
if (comptime IS_DEBUG) {
@panic("Tried to stop unknown inspector session");
}
} }
// From CDP docs // From CDP docs

View File

@@ -28,6 +28,7 @@ const js = @import("../browser/js/js.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const Browser = @import("../browser/Browser.zig"); const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig"); const Session = @import("../browser/Session.zig");
const HttpClient = @import("../http/Client.zig");
const Page = @import("../browser/Page.zig"); const Page = @import("../browser/Page.zig");
const Incrementing = @import("../id.zig").Incrementing; const Incrementing = @import("../id.zig").Incrementing;
const Notification = @import("../Notification.zig"); const Notification = @import("../Notification.zig");
@@ -85,10 +86,11 @@ pub fn CDPT(comptime TypeProvider: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(app: *App, http_client: *HttpClient, client: TypeProvider.Client) !Self {
const allocator = app.allocator; const allocator = app.allocator;
const browser = try Browser.init(app, .{ const browser = try Browser.init(app, .{
.env = .{ .with_inspector = true }, .env = .{ .with_inspector = true },
.http_client = http_client,
}); });
errdefer browser.deinit(); errdefer browser.deinit();
@@ -403,8 +405,9 @@ pub fn BrowserContext(comptime CDP_T: type) type {
const session = try cdp.browser.newSession(notification); const session = try cdp.browser.newSession(notification);
const browser = &cdp.browser; const browser = &cdp.browser;
const inspector_session = browser.env.inspector.?.startSession(self); const inspector = browser.env.inspector.?;
errdefer browser.env.inspector.?.stopSession(); const inspector_session = try inspector.startSession(self);
errdefer inspector.stopSession(inspector_session);
var registry = Node.Registry.init(allocator); var registry = Node.Registry.init(allocator);
errdefer registry.deinit(); errdefer registry.deinit();
@@ -455,7 +458,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
// before deinit it. // before deinit it.
browser.env.inspector.?.resetContextGroup(); browser.env.inspector.?.resetContextGroup();
browser.runMessageLoop(); browser.runMessageLoop();
browser.env.inspector.?.stopSession(); browser.env.inspector.?.stopSession(self.inspector_session);
// abort all intercepted requests before closing the sesion/page // abort all intercepted requests before closing the sesion/page
// since some of these might callback into the page/scriptmanager // since some of these might callback into the page/scriptmanager

View File

@@ -85,7 +85,7 @@ const TestContext = struct {
self.client = Client.init(self.arena.allocator()); self.client = Client.init(self.arena.allocator());
// Don't use the arena here. We want to detect leaks in CDP. // Don't use the arena here. We want to detect leaks in CDP.
// The arena is only for test-specific stuff // The arena is only for test-specific stuff
self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; self.cdp_ = TestCDP.init(base.test_app, base.test_http, &self.client.?) catch unreachable;
} }
return &self.cdp_.?; return &self.cdp_.?;
} }

View File

@@ -17,8 +17,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
pub const c = @cImport({ pub const c = @cImport({
@cInclude("curl/curl.h"); @cInclude("curl/curl.h");
@@ -28,6 +26,8 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig"); pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer; pub const Transfer = Client.Transfer;
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const errors = @import("errors.zig"); const errors = @import("errors.zig");
@@ -41,8 +41,8 @@ const ArenaAllocator = std.heap.ArenaAllocator;
// once for all http connections is a win. // once for all http connections is a win.
const Http = @This(); const Http = @This();
allocator: Allocator,
config: *const Config, config: *const Config,
client: *Client,
ca_blob: ?c.curl_blob, ca_blob: ?c.curl_blob,
arena: ArenaAllocator, arena: ArenaAllocator,
@@ -59,40 +59,28 @@ pub fn init(allocator: Allocator, config: *const Config) !Http {
var ca_blob: ?c.curl_blob = null; var ca_blob: ?c.curl_blob = null;
if (config.tlsVerifyHost()) { if (config.tlsVerifyHost()) {
ca_blob = try loadCerts(allocator, arena.allocator()); ca_blob = try loadCerts(allocator);
} }
var client = try Client.init(allocator, ca_blob, config);
errdefer client.deinit();
return .{ return .{
.arena = arena, .arena = arena,
.client = client,
.ca_blob = ca_blob, .ca_blob = ca_blob,
.allocator = allocator,
.config = config, .config = config,
}; };
} }
pub fn deinit(self: *Http) void { pub fn deinit(self: *Http) void {
self.client.deinit(); if (self.ca_blob) |ca_blob| {
const data: [*]u8 = @ptrCast(ca_blob.data);
self.allocator.free(data[0..ca_blob.len]);
}
c.curl_global_cleanup(); c.curl_global_cleanup();
self.arena.deinit(); self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { pub fn createClient(self: *Http, allocator: Allocator) !*Client {
return self.client.tick(timeout_ms) catch |err| { return Client.init(allocator, self.ca_blob, self.config);
log.err(.app, "http poll", .{ .err = err });
return .normal;
};
}
pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void {
lp.assert(self.client.cdp_client == null, "Http addCDPClient existing", .{});
self.client.cdp_client = cdp_client;
}
pub fn removeCDPClient(self: *Http) void {
self.client.cdp_client = null;
} }
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
@@ -348,7 +336,7 @@ pub const Method = enum(u8) {
// This whole rescan + decode is really just needed for MacOS. On Linux // This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different // bundle.rescan does find the .pem file(s) which could be in a few different
// places, so it's still useful, just not efficient. // places, so it's still useful, just not efficient.
fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { fn loadCerts(allocator: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{}; var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator); try bundle.rescan(allocator);
defer bundle.deinit(allocator); defer bundle.deinit(allocator);
@@ -371,8 +359,9 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
(bundle.map.count() * 75) + // start / end per certificate + extra, just in case (bundle.map.count() * 75) + // start / end per certificate + extra, just in case
(encoded_size / 64) // newline per 64 characters (encoded_size / 64) // newline per 64 characters
; ;
try arr.ensureTotalCapacity(arena, buffer_size); try arr.ensureTotalCapacity(allocator, buffer_size);
var writer = arr.writer(arena); errdefer arr.deinit(allocator);
var writer = arr.writer(allocator);
var it = bundle.map.valueIterator(); var it = bundle.map.valueIterator();
while (it.next()) |index| { while (it.next()) |index| {
@@ -385,11 +374,16 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
} }
// Final encoding should not be larger than our initial size estimate // Final encoding should not be larger than our initial size estimate
lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len });
// Allocate exactly the size needed and copy the data
const result = try allocator.dupe(u8, arr.items);
// Free the original oversized allocation
arr.deinit(allocator);
return .{ return .{
.len = arr.items.len, .len = result.len,
.data = arr.items.ptr, .data = result.ptr,
.flags = 0, .flags = 0,
}; };
} }

View File

@@ -38,9 +38,12 @@ pub const FetchOpts = struct {
dump: dump.RootOpts, dump: dump.RootOpts,
writer: ?*std.Io.Writer = null, writer: ?*std.Io.Writer = null,
}; };
pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { pub fn fetch(allocator: std.mem.Allocator, app: *App, url: [:0]const u8, opts: FetchOpts) !void {
var browser = try Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
const notification = try Notification.init(app.allocator); defer http_client.deinit();
var browser = try Browser.init(app, .{ .http_client = http_client });
const notification = try Notification.init(allocator);
defer notification.deinit(); defer notification.deinit();
defer browser.deinit(); defer browser.deinit();

View File

@@ -49,7 +49,7 @@ const Opts = struct {
pub var opts = Opts{}; pub var opts = Opts{};
// synchronizes writes to the output // synchronizes access to _interceptor
var out_lock: Thread.Mutex = .{}; var out_lock: Thread.Mutex = .{};
// synchronizes access to last_log // synchronizes access to last_log
@@ -146,7 +146,14 @@ fn logTo(comptime scope: Scope, level: Level, comptime msg: []const u8, data: an
} }
out.flush() catch return; out.flush() catch return;
const interceptor = _interceptor orelse return; // Copy the interceptor under lock, then release before doing I/O
const interceptor = blk: {
out_lock.lock();
defer out_lock.unlock();
break :blk _interceptor orelse return;
};
// I/O operations happen without holding the lock to minimize contention
if (interceptor.writer(interceptor.ctx, scope, level)) |iwriter| { if (interceptor.writer(interceptor.ctx, scope, level)) |iwriter| {
try logLogfmt(scope, level, msg, data, iwriter); try logLogfmt(scope, level, msg, data, iwriter);
try iwriter.flush(); try iwriter.flush();
@@ -368,10 +375,14 @@ fn timestamp(comptime mode: datetime.TimestampMode) u64 {
var _interceptor: ?Interceptor = null; var _interceptor: ?Interceptor = null;
pub fn registerInterceptor(interceptor: Interceptor) void { pub fn registerInterceptor(interceptor: Interceptor) void {
out_lock.lock();
defer out_lock.unlock();
_interceptor = interceptor; _interceptor = interceptor;
} }
pub fn unregisterInterceptor() void { pub fn unregisterInterceptor() void {
out_lock.lock();
defer out_lock.unlock();
_interceptor = null; _interceptor = null;
} }

View File

@@ -93,7 +93,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
}; };
// _server is global to handle graceful shutdown. // _server is global to handle graceful shutdown.
var server = try lp.Server.init(app, address); var server = try lp.Server.init(allocator, app, address);
defer server.deinit(); defer server.deinit();
try sighandler.on(lp.Server.stop, .{&server}); try sighandler.on(lp.Server.stop, .{&server});
@@ -123,7 +123,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
fetch_opts.writer = &writer.interface; fetch_opts.writer = &writer.interface;
} }
lp.fetch(app, url, fetch_opts) catch |err| { lp.fetch(allocator, app, url, fetch_opts) catch |err| {
log.fatal(.app, "fetch error", .{ .err = err, .url = url }); log.fatal(.app, "fetch error", .{ .err = err, .url = url });
return err; return err;
}; };

View File

@@ -44,8 +44,11 @@ pub fn main() !void {
var test_arena = std.heap.ArenaAllocator.init(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit(); defer test_arena.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
const notification = try lp.Notification.init(app.allocator); defer http_client.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
const notification = try lp.Notification.init(allocator);
defer notification.deinit(); defer notification.deinit();
defer browser.deinit(); defer browser.deinit();

View File

@@ -67,7 +67,10 @@ pub fn main() !void {
var app = try lp.App.init(allocator, &config); var app = try lp.App.init(allocator, &config);
defer app.deinit(); defer app.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
defer http_client.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
// An arena for running each tests. Is reset after every test. // An arena for running each tests. Is reset after every test.
@@ -109,7 +112,7 @@ fn run(
test_file: []const u8, test_file: []const u8,
err_out: *?[]const u8, err_out: *?[]const u8,
) ![]const u8 { ) ![]const u8 {
const notification = try lp.Notification.init(browser.allocator); const notification = try lp.Notification.init(browser.app.allocator);
defer notification.deinit(); defer notification.deinit();
const session = try browser.newSession(notification); const session = try browser.newSession(notification);

View File

@@ -39,6 +39,7 @@ pub fn reset() void {
const App = @import("App.zig"); const App = @import("App.zig");
const js = @import("browser/js/js.zig"); const js = @import("browser/js/js.zig");
const Config = @import("Config.zig"); const Config = @import("Config.zig");
const Client = @import("http/Client.zig");
const Page = @import("browser/Page.zig"); const Page = @import("browser/Page.zig");
const Browser = @import("browser/Browser.zig"); const Browser = @import("browser/Browser.zig");
const Session = @import("browser/Session.zig"); const Session = @import("browser/Session.zig");
@@ -334,6 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool {
} }
pub var test_app: *App = undefined; pub var test_app: *App = undefined;
pub var test_http: *Client = undefined;
pub var test_browser: Browser = undefined; pub var test_browser: Browser = undefined;
pub var test_notification: *Notification = undefined; pub var test_notification: *Notification = undefined;
pub var test_session: *Session = undefined; pub var test_session: *Session = undefined;
@@ -472,7 +474,10 @@ test "tests:beforeAll" {
test_app = try App.init(test_allocator, &test_config); test_app = try App.init(test_allocator, &test_config);
errdefer test_app.deinit(); errdefer test_app.deinit();
test_browser = try Browser.init(test_app, .{}); test_http = try test_app.http.createClient(test_allocator);
errdefer test_http.deinit();
test_browser = try Browser.init(test_app, .{ .http_client = test_http });
errdefer test_browser.deinit(); errdefer test_browser.deinit();
// Create notification for testing // Create notification for testing
@@ -519,13 +524,15 @@ test "tests:afterAll" {
test_notification.deinit(); test_notification.deinit();
test_browser.deinit(); test_browser.deinit();
test_http.deinit();
test_app.deinit(); test_app.deinit();
test_config.deinit(@import("root").tracking_allocator); test_config.deinit(@import("root").tracking_allocator);
} }
fn serveCDP(wg: *std.Thread.WaitGroup) !void { fn serveCDP(wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9583); const address = try std.net.Address.parseIp("127.0.0.1", 9583);
test_cdp_server = try Server.init(test_app, address); const test_allocator = @import("root").tracking_allocator;
test_cdp_server = try Server.init(test_allocator, test_app, address);
wg.finish(); wg.finish();