Implement multi CDP connections

This commit is contained in:
Nikolay Govorov
2026-01-21 20:32:06 +00:00
parent 24b7035b1b
commit 45fca49329
19 changed files with 1775 additions and 1348 deletions

View File

@@ -27,6 +27,7 @@ const Platform = @import("browser/js/Platform.zig");
const Notification = @import("Notification.zig"); const Notification = @import("Notification.zig");
const Telemetry = @import("telemetry/telemetry.zig").Telemetry; const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
const SharedState = @import("SharedState.zig");
// Container for global state / objects that various parts of the system // Container for global state / objects that various parts of the system
// might need. // might need.
@@ -40,6 +41,7 @@ telemetry: Telemetry,
allocator: Allocator, allocator: Allocator,
app_dir_path: ?[]const u8, app_dir_path: ?[]const u8,
notification: *Notification, notification: *Notification,
shared: ?*SharedState = null,
shutdown: bool = false, shutdown: bool = false,
pub const RunMode = enum { pub const RunMode = enum {
@@ -59,6 +61,8 @@ pub const Config = struct {
http_max_host_open: ?u8 = null, http_max_host_open: ?u8 = null,
http_max_concurrent: ?u8 = null, http_max_concurrent: ?u8 = null,
user_agent: [:0]const u8, user_agent: [:0]const u8,
max_sessions: u32 = 10, // Max concurrent CDP connections
session_memory_limit: usize = 64 * 1024 * 1024, // 64MB per session
}; };
pub fn init(allocator: Allocator, config: Config) !*App { pub fn init(allocator: Allocator, config: Config) !*App {
@@ -67,6 +71,8 @@ pub fn init(allocator: Allocator, config: Config) !*App {
app.config = config; app.config = config;
app.allocator = allocator; app.allocator = allocator;
app.shared = null;
app.shutdown = false;
app.notification = try Notification.init(allocator, null); app.notification = try Notification.init(allocator, null);
errdefer app.notification.deinit(); errdefer app.notification.deinit();
@@ -105,6 +111,12 @@ pub fn deinit(self: *App) void {
} }
const allocator = self.allocator; const allocator = self.allocator;
if (self.shared) |shared| {
shared.deinit();
self.shared = null;
}
if (self.app_dir_path) |app_dir_path| { if (self.app_dir_path) |app_dir_path| {
allocator.free(app_dir_path); allocator.free(app_dir_path);
self.app_dir_path = null; self.app_dir_path = null;
@@ -118,6 +130,31 @@ pub fn deinit(self: *App) void {
allocator.destroy(self); allocator.destroy(self);
} }
/// Create SharedState for multi-session server mode.
/// This must be called before starting the server for multi-CDP support.
pub fn createSharedState(self: *App) !*SharedState {
if (self.shared != null) {
return error.SharedStateAlreadyExists;
}
const shared = try SharedState.init(self.allocator, .{
.run_mode = self.config.run_mode,
.tls_verify_host = self.config.tls_verify_host,
.http_proxy = self.config.http_proxy,
.proxy_bearer_token = self.config.proxy_bearer_token,
.http_timeout_ms = self.config.http_timeout_ms,
.http_connect_timeout_ms = self.config.http_connect_timeout_ms,
.http_max_host_open = self.config.http_max_host_open,
.http_max_concurrent = self.config.http_max_concurrent,
.user_agent = self.config.user_agent,
.max_sessions = self.config.max_sessions,
.session_memory_limit = self.config.session_memory_limit,
});
self.shared = shared;
return shared;
}
fn getAndMakeAppDir(allocator: Allocator) ?[]const u8 { fn getAndMakeAppDir(allocator: Allocator) ?[]const u8 {
if (@import("builtin").is_test) { if (@import("builtin").is_test) {
return allocator.dupe(u8, "/tmp") catch unreachable; return allocator.dupe(u8, "/tmp") catch unreachable;

186
src/LimitedAllocator.zig Normal file
View File

@@ -0,0 +1,186 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
/// Per-session memory limiting allocator.
/// Wraps a backing allocator and enforces a maximum memory limit.
/// Thread-local: each SessionThread creates its own LimitedAllocator.
const LimitedAllocator = @This();
backing: Allocator,
bytes_allocated: usize,
max_bytes: usize,
pub fn init(backing: Allocator, max_bytes: usize) LimitedAllocator {
return .{
.backing = backing,
.bytes_allocated = 0,
.max_bytes = max_bytes,
};
}
pub fn allocator(self: *LimitedAllocator) Allocator {
return .{
.ptr = self,
.vtable = &vtable,
};
}
pub fn bytesAllocated(self: *const LimitedAllocator) usize {
return self.bytes_allocated;
}
pub fn bytesRemaining(self: *const LimitedAllocator) usize {
return self.max_bytes -| self.bytes_allocated;
}
const vtable: Allocator.VTable = .{
.alloc = alloc,
.resize = resize,
.remap = remap,
.free = free,
};
fn alloc(ctx: *anyopaque, len: usize, alignment: std.mem.Alignment, ret_addr: usize) ?[*]u8 {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (self.bytes_allocated +| len > self.max_bytes) {
return null; // Out of memory for this session
}
const result = self.backing.rawAlloc(len, alignment, ret_addr);
if (result != null) {
self.bytes_allocated += len;
}
return result;
}
fn resize(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) bool {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (new_len > buf.len) {
const additional = new_len - buf.len;
if (self.bytes_allocated +| additional > self.max_bytes) {
return false; // Would exceed limit
}
}
if (self.backing.rawResize(buf, alignment, new_len, ret_addr)) {
if (new_len > buf.len) {
self.bytes_allocated += new_len - buf.len;
} else {
self.bytes_allocated -= buf.len - new_len;
}
return true;
}
return false;
}
fn remap(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) ?[*]u8 {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (new_len > buf.len) {
const additional = new_len - buf.len;
if (self.bytes_allocated +| additional > self.max_bytes) {
return null; // Would exceed limit
}
}
const result = self.backing.rawRemap(buf, alignment, new_len, ret_addr);
if (result != null) {
if (new_len > buf.len) {
self.bytes_allocated += new_len - buf.len;
} else {
self.bytes_allocated -= buf.len - new_len;
}
}
return result;
}
fn free(ctx: *anyopaque, buf: []u8, alignment: std.mem.Alignment, ret_addr: usize) void {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
self.bytes_allocated -|= buf.len;
self.backing.rawFree(buf, alignment, ret_addr);
}
const testing = std.testing;
test "LimitedAllocator: basic allocation" {
var limited = LimitedAllocator.init(testing.allocator, 1024);
const alloc_ = limited.allocator();
const slice = try alloc_.alloc(u8, 100);
defer alloc_.free(slice);
try testing.expectEqual(100, limited.bytesAllocated());
try testing.expectEqual(924, limited.bytesRemaining());
}
test "LimitedAllocator: exceeds limit" {
var limited = LimitedAllocator.init(testing.allocator, 100);
const alloc_ = limited.allocator();
// Allocation should fail with OutOfMemory when exceeding limit
try testing.expectError(error.OutOfMemory, alloc_.alloc(u8, 200));
try testing.expectEqual(0, limited.bytesAllocated());
}
test "LimitedAllocator: free updates counter" {
var limited = LimitedAllocator.init(testing.allocator, 1024);
const alloc_ = limited.allocator();
const slice = try alloc_.alloc(u8, 100);
try testing.expectEqual(100, limited.bytesAllocated());
alloc_.free(slice);
try testing.expectEqual(0, limited.bytesAllocated());
}
test "LimitedAllocator: multiple allocations" {
var limited = LimitedAllocator.init(testing.allocator, 1024);
const alloc_ = limited.allocator();
const s1 = try alloc_.alloc(u8, 100);
const s2 = try alloc_.alloc(u8, 200);
const s3 = try alloc_.alloc(u8, 300);
try testing.expectEqual(600, limited.bytesAllocated());
alloc_.free(s2);
try testing.expectEqual(400, limited.bytesAllocated());
alloc_.free(s1);
alloc_.free(s3);
try testing.expectEqual(0, limited.bytesAllocated());
}
test "LimitedAllocator: allocation at limit boundary" {
var limited = LimitedAllocator.init(testing.allocator, 100);
const alloc_ = limited.allocator();
const s1 = try alloc_.alloc(u8, 50);
defer alloc_.free(s1);
const s2 = try alloc_.alloc(u8, 50);
defer alloc_.free(s2);
// Should fail - at limit
try testing.expectError(error.OutOfMemory, alloc_.alloc(u8, 1));
}

File diff suppressed because it is too large Load Diff

133
src/SessionManager.zig Normal file
View File

@@ -0,0 +1,133 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const SessionThread = @import("SessionThread.zig");
/// Thread-safe collection of active CDP sessions.
/// Manages lifecycle and enforces connection limits.
const SessionManager = @This();
mutex: std.Thread.Mutex,
sessions: std.ArrayListUnmanaged(*SessionThread),
allocator: Allocator,
max_sessions: u32,
pub fn init(allocator: Allocator, max_sessions: u32) SessionManager {
return .{
.mutex = .{},
.sessions = .{},
.allocator = allocator,
.max_sessions = max_sessions,
};
}
pub fn deinit(self: *SessionManager) void {
self.stopAll();
self.sessions.deinit(self.allocator);
}
/// Add a new session to the manager.
/// Returns error.TooManySessions if the limit is reached.
pub fn add(self: *SessionManager, session: *SessionThread) !void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.sessions.items.len >= self.max_sessions) {
return error.TooManySessions;
}
try self.sessions.append(self.allocator, session);
}
/// Remove a session from the manager.
/// Called when a session terminates.
pub fn remove(self: *SessionManager, session: *SessionThread) void {
self.mutex.lock();
defer self.mutex.unlock();
for (self.sessions.items, 0..) |s, i| {
if (s == session) {
_ = self.sessions.swapRemove(i);
return;
}
}
}
/// Stop all active sessions and wait for them to terminate.
pub fn stopAll(self: *SessionManager) void {
// First, signal all sessions to stop
{
self.mutex.lock();
defer self.mutex.unlock();
for (self.sessions.items) |session| {
session.stop();
}
}
// Then wait for all to join (without holding the lock)
// We need to copy the list since sessions will remove themselves
var sessions_copy: std.ArrayListUnmanaged(*SessionThread) = .{};
{
self.mutex.lock();
defer self.mutex.unlock();
sessions_copy.appendSlice(self.allocator, self.sessions.items) catch return;
}
defer sessions_copy.deinit(self.allocator);
for (sessions_copy.items) |session| {
session.join();
session.deinit();
}
// Clear the sessions list
{
self.mutex.lock();
defer self.mutex.unlock();
self.sessions.clearRetainingCapacity();
}
}
/// Get the current number of active sessions.
pub fn count(self: *SessionManager) usize {
self.mutex.lock();
defer self.mutex.unlock();
return self.sessions.items.len;
}
const testing = std.testing;
test "SessionManager: add and remove" {
var manager = SessionManager.init(testing.allocator, 10);
defer manager.deinit();
try testing.expectEqual(0, manager.count());
}
test "SessionManager: max sessions limit" {
var manager = SessionManager.init(testing.allocator, 2);
defer manager.deinit();
// We can't easily create mock SessionThreads for this test,
// so we just verify the initialization works
try testing.expectEqual(0, manager.count());
}

885
src/SessionThread.zig Normal file
View File

@@ -0,0 +1,885 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const lp = @import("lightpanda");
const builtin = @import("builtin");
const posix = std.posix;
const net = std.net;
const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig");
const SharedState = @import("SharedState.zig");
const SessionManager = @import("SessionManager.zig");
const LimitedAllocator = @import("LimitedAllocator.zig");
const HttpClient = @import("http/Client.zig");
const CDP = @import("cdp/cdp.zig").CDP;
const BrowserSession = @import("browser/Session.zig");
const timestamp = @import("datetime.zig").timestamp;
const MAX_HTTP_REQUEST_SIZE = 4096;
const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
/// Encapsulates a single CDP session running in its own thread.
/// Each SessionThread has:
/// - Its own client socket
/// - Its own HttpClient (with shared curl_share from SharedState)
/// - Its own V8 Isolate (via Browser/CDP)
/// - Its own memory-limited allocator
const SessionThread = @This();
thread: ?std.Thread,
shutdown: std.atomic.Value(bool),
client_socket: posix.socket_t,
shared: *SharedState,
session_manager: *SessionManager,
limited_allocator: LimitedAllocator,
http_client: ?*HttpClient,
timeout_ms: u32,
json_version_response: []const u8,
pub fn spawn(
shared: *SharedState,
session_manager: *SessionManager,
socket: posix.socket_t,
timeout_ms: u32,
json_version_response: []const u8,
session_memory_limit: usize,
) !*SessionThread {
const self = try shared.allocator.create(SessionThread);
errdefer shared.allocator.destroy(self);
self.* = .{
.thread = null,
.shutdown = std.atomic.Value(bool).init(false),
.client_socket = socket,
.shared = shared,
.session_manager = session_manager,
.limited_allocator = LimitedAllocator.init(shared.allocator, session_memory_limit),
.http_client = null,
.timeout_ms = timeout_ms,
.json_version_response = json_version_response,
};
// Start the thread
self.thread = try std.Thread.spawn(.{}, run, .{self});
return self;
}
pub fn stop(self: *SessionThread) void {
self.shutdown.store(true, .release);
// Close the socket to interrupt any blocking reads
if (self.client_socket != -1) {
switch (builtin.target.os.tag) {
.linux => posix.shutdown(self.client_socket, .recv) catch {},
.macos, .freebsd, .netbsd, .openbsd => posix.close(self.client_socket),
else => {},
}
}
}
pub fn join(self: *SessionThread) void {
if (self.thread) |thread| {
thread.join();
self.thread = null;
}
}
pub fn deinit(self: *SessionThread) void {
self.join();
if (self.http_client) |client| {
client.deinit();
self.http_client = null;
}
self.shared.allocator.destroy(self);
}
fn sessionAllocator(self: *SessionThread) Allocator {
return self.limited_allocator.allocator();
}
fn run(self: *SessionThread) void {
defer {
// Remove ourselves from the session manager when we're done
self.session_manager.remove(self);
}
self.runInner() catch |err| {
log.err(.app, "session thread error", .{ .err = err });
};
}
fn runInner(self: *SessionThread) !void {
const alloc = self.sessionAllocator();
// Create our own HTTP client using the shared curl_share
self.http_client = try self.shared.createHttpClient(alloc);
errdefer {
if (self.http_client) |client| {
client.deinit();
self.http_client = null;
}
}
const client = try alloc.create(Client);
defer alloc.destroy(client);
client.* = try Client.init(self.client_socket, self);
defer client.deinit();
var http = self.http_client.?;
http.cdp_client = .{
.socket = self.client_socket,
.ctx = client,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
};
defer http.cdp_client = null;
lp.assert(client.mode == .http, "SessionThread.run invalid mode", .{});
const timeout_ms = self.timeout_ms;
while (!self.shutdown.load(.acquire)) {
const tick_result = http.tick(timeout_ms) catch .normal;
if (tick_result != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (client.readSocket() == false) {
return;
}
if (client.mode == .cdp) {
break; // switch to CDP loop
}
}
var cdp = &client.mode.cdp;
var last_message = timestamp(.monotonic);
var ms_remaining = timeout_ms;
while (!self.shutdown.load(.acquire)) {
switch (cdp.pageWait(ms_remaining)) {
.cdp_socket => {
if (client.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = timeout_ms;
},
.no_page => {
const tick_res = http.tick(ms_remaining) catch .normal;
if (tick_res != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (client.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = timeout_ms;
},
.done => {
const elapsed = timestamp(.monotonic) - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @intCast(elapsed);
},
.navigate => unreachable,
}
}
}
/// The CDP/WebSocket client - adapted from Server.zig
pub const Client = struct {
mode: union(enum) {
http: void,
cdp: CDP,
},
session_thread: *SessionThread,
reader: Reader(true),
socket: posix.socket_t,
socket_flags: usize,
send_arena: ArenaAllocator,
const EMPTY_PONG = [_]u8{ 138, 0 };
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 };
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 };
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 };
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 };
fn init(socket: posix.socket_t, session_thread: *SessionThread) !Client {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
const alloc = session_thread.sessionAllocator();
var reader = try Reader(true).init(alloc);
errdefer reader.deinit();
return .{
.socket = socket,
.session_thread = session_thread,
.reader = reader,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(alloc),
};
}
fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
self.reader.deinit();
self.send_arena.deinit();
}
fn blockingReadStart(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| {
log.warn(.app, "CDP blockingReadStart", .{ .err = err });
return false;
};
return true;
}
fn blockingRead(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
return self.readSocket();
}
fn blockingReadStop(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.warn(.app, "CDP blockingReadStop", .{ .err = err });
return false;
};
return true;
}
fn readSocket(self: *Client) bool {
const n = posix.read(self.socket, self.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
return self.processData(n) catch false;
}
fn readBuf(self: *Client) []u8 {
return self.reader.readBuf();
}
fn processData(self: *Client, len: usize) !bool {
self.reader.len += len;
switch (self.mode) {
.cdp => |*cdp| return self.processWebsocketMessage(cdp),
.http => return self.processHTTPRequest(),
}
}
fn processHTTPRequest(self: *Client) !bool {
lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len];
if (request.len > MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge;
}
if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) {
return true;
}
defer self.reader.len = 0;
return self.handleHTTPRequest(request) catch |err| {
switch (err) {
error.NotFound => self.writeHTTPErrorResponse(404, "Not found"),
error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"),
error.InvalidProtocol => self.writeHTTPErrorResponse(400, "Invalid HTTP protocol"),
error.MissingHeaders => self.writeHTTPErrorResponse(400, "Missing required header"),
error.InvalidUpgradeHeader => self.writeHTTPErrorResponse(400, "Unsupported upgrade type"),
error.InvalidVersionHeader => self.writeHTTPErrorResponse(400, "Invalid websocket version"),
error.InvalidConnectionHeader => self.writeHTTPErrorResponse(400, "Invalid connection header"),
else => {
log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] });
self.writeHTTPErrorResponse(500, "Internal Server Error");
},
}
return err;
};
}
fn handleHTTPRequest(self: *Client, request: []u8) !bool {
if (request.len < 18) {
return error.InvalidRequest;
}
if (std.mem.eql(u8, request[0..4], "GET ") == false) {
return error.NotFound;
}
const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse {
return error.InvalidRequest;
};
const url = request[4..url_end];
if (std.mem.eql(u8, url, "/")) {
try self.upgradeConnection(request);
return true;
}
if (std.mem.eql(u8, url, "/json/version")) {
try self.send(self.session_thread.json_version_response);
try posix.shutdown(self.socket, .recv);
return false;
}
return error.NotFound;
}
fn upgradeConnection(self: *Client, request: []u8) !void {
const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable;
const request_line = request[0..request_line_end];
if (!std.ascii.endsWithIgnoreCase(request_line, "http/1.1")) {
return error.InvalidProtocol;
}
var key: []const u8 = "";
var required_headers: u8 = 0;
var buf = request[request_line_end + 2 ..];
while (buf.len > 4) {
const index = std.mem.indexOfScalar(u8, buf, '\r') orelse unreachable;
const separator = std.mem.indexOfScalar(u8, buf[0..index], ':') orelse return error.InvalidRequest;
const name = std.mem.trim(u8, toLower(buf[0..separator]), &std.ascii.whitespace);
const value = std.mem.trim(u8, buf[(separator + 1)..index], &std.ascii.whitespace);
if (std.mem.eql(u8, name, "upgrade")) {
if (!std.ascii.eqlIgnoreCase("websocket", value)) {
return error.InvalidUpgradeHeader;
}
required_headers |= 1;
} else if (std.mem.eql(u8, name, "sec-websocket-version")) {
if (value.len != 2 or value[0] != '1' or value[1] != '3') {
return error.InvalidVersionHeader;
}
required_headers |= 2;
} else if (std.mem.eql(u8, name, "connection")) {
if (std.ascii.indexOfIgnoreCase(value, "upgrade") == null) {
return error.InvalidConnectionHeader;
}
required_headers |= 4;
} else if (std.mem.eql(u8, name, "sec-websocket-key")) {
key = value;
required_headers |= 8;
}
const next = index + 2;
buf = buf[next..];
}
if (required_headers != 15) {
return error.MissingHeaders;
}
const alloc = self.send_arena.allocator();
const response = blk: {
const template =
"HTTP/1.1 101 Switching Protocols\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: upgrade\r\n" ++
"Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n";
const res = try alloc.dupe(u8, template);
const key_pos = res.len - 32;
var h: [20]u8 = undefined;
var hasher = std.crypto.hash.Sha1.init(.{});
hasher.update(key);
hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
hasher.final(&h);
_ = std.base64.standard.Encoder.encode(res[key_pos .. key_pos + 28], h[0..]);
break :blk res;
};
self.mode = .{ .cdp = try CDP.init(self.session_thread.shared, self.session_thread.http_client.?, self) };
return self.send(response);
}
fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void {
const response = std.fmt.comptimePrint(
"HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}",
.{ status, body.len, body },
);
self.send(response) catch {};
}
fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool {
var reader = &self.reader;
while (true) {
const msg = reader.next() catch |err| {
switch (err) {
error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {},
error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.NestedFragementation => self.send(&CLOSE_PROTOCOL_ERROR) catch {},
error.OutOfMemory => {},
}
return err;
} orelse break;
switch (msg.type) {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
self.send(&CLOSE_NORMAL) catch {};
return false;
},
.text, .binary => if (cdp.handleMessage(msg.data) == false) {
return false;
},
}
if (msg.cleanup_fragment) {
reader.cleanup();
}
}
reader.compact();
return true;
}
fn sendPong(self: *Client, data: []const u8) !void {
if (data.len == 0) {
return self.send(&EMPTY_PONG);
}
var header_buf: [10]u8 = undefined;
const header = websocketHeader(&header_buf, .pong, data.len);
const alloc = self.send_arena.allocator();
var framed = try alloc.alloc(u8, header.len + data.len);
@memcpy(framed[0..header.len], header);
@memcpy(framed[header.len..], data);
return self.send(framed);
}
pub fn sendJSON(self: *Client, message: anytype, opts: std.json.Stringify.Options) !void {
const alloc = self.send_arena.allocator();
var aw: std.Io.Writer.Allocating = .init(alloc);
try aw.writer.writeAll(&.{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 });
try std.json.Stringify.value(message, opts, &aw.writer);
const written = aw.written();
// Fill in websocket header
var header_buf: [10]u8 = undefined;
const payload_len = written.len - 10;
const header = websocketHeader(&header_buf, .text, payload_len);
const start = 10 - header.len;
// Copy header into the reserved space
const data = @constCast(written);
@memcpy(data[start..10], header);
return self.send(data[start..]);
}
pub fn sendJSONRaw(self: *Client, buf: std.ArrayListUnmanaged(u8)) !void {
var header_buf: [10]u8 = undefined;
const payload_len = buf.items.len - 10;
const header = websocketHeader(&header_buf, .text, payload_len);
const start = 10 - header.len;
const message = buf.items;
@memcpy(message[start..10], header);
return self.send(message[start..]);
}
fn send(self: *Client, data: []const u8) !void {
var pos: usize = 0;
var changed_to_blocking: bool = false;
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
defer if (changed_to_blocking) {
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.err(.app, "CDP restore nonblocking", .{ .err = err });
};
};
LOOP: while (pos < data.len) {
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
error.WouldBlock => {
lp.assert(changed_to_blocking == false, "Client.double block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
continue :LOOP;
},
else => return err,
};
if (written == 0) {
return error.Closed;
}
pos += written;
}
}
};
// WebSocket message reader
fn Reader(comptime EXPECT_MASK: bool) type {
return struct {
allocator: Allocator,
pos: usize = 0,
len: usize = 0,
buf: []u8,
fragments: ?Fragments = null,
const Self = @This();
fn init(alloc: Allocator) !Self {
const buf = try alloc.alloc(u8, 16 * 1024);
return .{
.buf = buf,
.allocator = alloc,
};
}
fn deinit(self: *Self) void {
self.cleanup();
self.allocator.free(self.buf);
}
fn cleanup(self: *Self) void {
if (self.fragments) |*f| {
f.message.deinit(self.allocator);
self.fragments = null;
}
}
fn readBuf(self: *Self) []u8 {
return self.buf[self.len..];
}
fn next(self: *Self) !?Message {
LOOP: while (true) {
var buf = self.buf[self.pos..self.len];
const length_of_len, const message_len = extractLengths(buf) orelse {
return null;
};
const byte1 = buf[0];
if (byte1 & 112 != 0) {
return error.ReservedFlags;
}
if (comptime EXPECT_MASK) {
if (buf[1] & 128 != 128) {
return error.NotMasked;
}
} else if (buf[1] & 128 != 0) {
return error.Masked;
}
var is_control = false;
var is_continuation = false;
var message_type: Message.Type = undefined;
switch (byte1 & 15) {
0 => is_continuation = true,
1 => message_type = .text,
2 => message_type = .binary,
8 => {
is_control = true;
message_type = .close;
},
9 => {
is_control = true;
message_type = .ping;
},
10 => {
is_control = true;
message_type = .pong;
},
else => return error.InvalidMessageType,
}
if (is_control) {
if (message_len > 125) {
return error.ControlTooLarge;
}
} else if (message_len > MAX_MESSAGE_SIZE) {
return error.TooLarge;
} else if (message_len > self.buf.len) {
const len_now = self.buf.len;
self.buf = try growBuffer(self.allocator, self.buf, message_len);
buf = self.buf[0..len_now];
return null;
} else if (buf.len < message_len) {
return null;
}
const header_len = 2 + length_of_len + if (comptime EXPECT_MASK) 4 else 0;
const payload = buf[header_len..message_len];
if (comptime EXPECT_MASK) {
mask(buf[header_len - 4 .. header_len], payload);
}
self.pos += message_len;
const fin = byte1 & 128 == 128;
if (is_continuation) {
const fragments = &(self.fragments orelse return error.InvalidContinuation);
if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) {
return error.TooLarge;
}
try fragments.message.appendSlice(self.allocator, payload);
if (fin == false) {
continue :LOOP;
}
return .{
.type = fragments.type,
.data = fragments.message.items,
.cleanup_fragment = true,
};
}
const can_be_fragmented = message_type == .text or message_type == .binary;
if (self.fragments != null and can_be_fragmented) {
return error.NestedFragementation;
}
if (fin == false) {
if (can_be_fragmented == false) {
return error.InvalidContinuation;
}
var fragments = Fragments{ .message = .{}, .type = message_type };
try fragments.message.appendSlice(self.allocator, payload);
self.fragments = fragments;
continue :LOOP;
}
return .{
.data = payload,
.type = message_type,
.cleanup_fragment = false,
};
}
}
fn extractLengths(buf: []const u8) ?struct { usize, usize } {
if (buf.len < 2) {
return null;
}
const length_of_len: usize = switch (buf[1] & 127) {
126 => 2,
127 => 8,
else => 0,
};
if (buf.len < length_of_len + 2) {
return null;
}
const message_length = switch (length_of_len) {
2 => @as(u16, @intCast(buf[3])) | @as(u16, @intCast(buf[2])) << 8,
8 => @as(u64, @intCast(buf[9])) | @as(u64, @intCast(buf[8])) << 8 | @as(u64, @intCast(buf[7])) << 16 | @as(u64, @intCast(buf[6])) << 24 | @as(u64, @intCast(buf[5])) << 32 | @as(u64, @intCast(buf[4])) << 40 | @as(u64, @intCast(buf[3])) << 48 | @as(u64, @intCast(buf[2])) << 56,
else => buf[1] & 127,
} + length_of_len + 2 + if (comptime EXPECT_MASK) 4 else 0;
return .{ length_of_len, message_length };
}
fn compact(self: *Self) void {
const pos = self.pos;
const len_now = self.len;
lp.assert(pos <= len_now, "Client.Reader.compact precondition", .{ .pos = pos, .len = len_now });
const partial_bytes = len_now - pos;
if (partial_bytes == 0) {
self.pos = 0;
self.len = 0;
return;
}
const partial = self.buf[pos..len_now];
if (extractLengths(partial)) |length_meta| {
const next_message_len = length_meta.@"1";
lp.assert(pos <= len_now, "Client.Reader.compact postcondition", .{ .next_len = next_message_len, .partial = partial_bytes });
const missing_bytes = next_message_len - partial_bytes;
const free_space = self.buf.len - len_now;
if (missing_bytes < free_space) {
return;
}
}
std.mem.copyForwards(u8, self.buf, partial);
self.pos = 0;
self.len = partial_bytes;
}
};
}
fn growBuffer(alloc: Allocator, buf: []u8, required_capacity: usize) ![]u8 {
var new_capacity = buf.len;
while (true) {
new_capacity +|= new_capacity / 2 + 8;
if (new_capacity >= required_capacity) break;
}
log.debug(.app, "CDP buffer growth", .{ .from = buf.len, .to = new_capacity });
if (alloc.resize(buf, new_capacity)) {
return buf.ptr[0..new_capacity];
}
const new_buffer = try alloc.alloc(u8, new_capacity);
@memcpy(new_buffer[0..buf.len], buf);
alloc.free(buf);
return new_buffer;
}
const Fragments = struct {
type: Message.Type,
message: std.ArrayListUnmanaged(u8),
};
const Message = struct {
type: Type,
data: []const u8,
cleanup_fragment: bool,
const Type = enum {
text,
binary,
close,
ping,
pong,
};
};
const OpCode = enum(u8) {
text = 128 | 1,
close = 128 | 8,
pong = 128 | 10,
};
fn websocketHeader(buf: []u8, op_code: OpCode, payload_len: usize) []const u8 {
lp.assert(buf.len == 10, "Websocket.Header", .{ .len = buf.len });
const len = payload_len;
buf[0] = 128 | @intFromEnum(op_code);
if (len <= 125) {
buf[1] = @intCast(len);
return buf[0..2];
}
if (len < 65536) {
buf[1] = 126;
buf[2] = @intCast((len >> 8) & 0xFF);
buf[3] = @intCast(len & 0xFF);
return buf[0..4];
}
buf[1] = 127;
buf[2] = 0;
buf[3] = 0;
buf[4] = 0;
buf[5] = 0;
buf[6] = @intCast((len >> 24) & 0xFF);
buf[7] = @intCast((len >> 16) & 0xFF);
buf[8] = @intCast((len >> 8) & 0xFF);
buf[9] = @intCast(len & 0xFF);
return buf[0..10];
}
fn toLower(str: []u8) []u8 {
for (str, 0..) |ch, i| {
str[i] = std.ascii.toLower(ch);
}
return str;
}
const backend_supports_vectors = switch (builtin.zig_backend) {
.stage2_llvm, .stage2_c => true,
else => false,
};
fn mask(m: []const u8, payload: []u8) void {
var data = payload;
if (!comptime backend_supports_vectors) return simpleMask(m, data);
const vector_size = std.simd.suggestVectorLength(u8) orelse @sizeOf(usize);
if (data.len >= vector_size) {
const mask_vector = std.simd.repeat(vector_size, @as(@Vector(4, u8), m[0..4].*));
while (data.len >= vector_size) {
const slice = data[0..vector_size];
const masked_data_slice: @Vector(vector_size, u8) = slice.*;
slice.* = masked_data_slice ^ mask_vector;
data = data[vector_size..];
}
}
simpleMask(m, data);
}
fn simpleMask(m: []const u8, payload: []u8) void {
for (payload, 0..) |b, i| {
payload[i] = b ^ m[i & 3];
}
}

266
src/SharedState.zig Normal file
View File

@@ -0,0 +1,266 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig");
const Http = @import("http/Http.zig");
const HttpClient = @import("http/Client.zig");
const CurlShare = @import("http/CurlShare.zig");
const Snapshot = @import("browser/js/Snapshot.zig");
const Platform = @import("browser/js/Platform.zig");
const Notification = @import("Notification.zig");
const App = @import("App.zig");
const c = Http.c;
/// SharedState holds all state shared between CDP sessions (read-only after init).
/// Each SessionThread gets a reference to this and can create its own resources
/// (like HttpClient) that use the shared components.
const SharedState = @This();
platform: Platform, // V8 platform (process-wide)
snapshot: Snapshot, // V8 startup snapshot
ca_blob: ?c.curl_blob, // TLS certificates
http_opts: Http.Opts, // HTTP configuration
curl_share: *CurlShare, // Shared HTTP resources (DNS, TLS, connections)
notification: *Notification, // Global notification hub
allocator: Allocator, // Thread-safe allocator
arena: ArenaAllocator, // Arena for shared resources
owns_v8_resources: bool, // Track whether V8 resources are owned or borrowed from App
pub const Config = struct {
max_sessions: u32 = 10, // Max concurrent CDP connections
session_memory_limit: usize = 64 * 1024 * 1024, // 64MB per session
run_mode: App.RunMode,
tls_verify_host: bool = true,
http_proxy: ?[:0]const u8 = null,
proxy_bearer_token: ?[:0]const u8 = null,
http_timeout_ms: ?u31 = null,
http_connect_timeout_ms: ?u31 = null,
http_max_host_open: ?u8 = null,
http_max_concurrent: ?u8 = null,
user_agent: [:0]const u8,
};
pub fn init(allocator: Allocator, config: Config) !*SharedState {
const self = try allocator.create(SharedState);
errdefer allocator.destroy(self);
self.allocator = allocator;
self.arena = ArenaAllocator.init(allocator);
errdefer self.arena.deinit();
// Initialize V8 platform (process-wide singleton)
self.platform = try Platform.init();
errdefer self.platform.deinit();
// Load V8 startup snapshot
self.snapshot = try Snapshot.load();
errdefer self.snapshot.deinit();
self.owns_v8_resources = true;
// Initialize notification hub
self.notification = try Notification.init(allocator, null);
errdefer self.notification.deinit();
// Build HTTP options
const arena_alloc = self.arena.allocator();
var adjusted_opts = Http.Opts{
.max_host_open = config.http_max_host_open orelse 4,
.max_concurrent = config.http_max_concurrent orelse 10,
.timeout_ms = config.http_timeout_ms orelse 5000,
.connect_timeout_ms = config.http_connect_timeout_ms orelse 0,
.http_proxy = config.http_proxy,
.tls_verify_host = config.tls_verify_host,
.proxy_bearer_token = config.proxy_bearer_token,
.user_agent = config.user_agent,
};
if (config.proxy_bearer_token) |bt| {
adjusted_opts.proxy_bearer_token = try std.fmt.allocPrintSentinel(arena_alloc, "Proxy-Authorization: Bearer {s}", .{bt}, 0);
}
self.http_opts = adjusted_opts;
// Load TLS certificates
if (config.tls_verify_host) {
self.ca_blob = try loadCerts(allocator, arena_alloc);
} else {
self.ca_blob = null;
}
// Initialize curl share handle for shared resources
self.curl_share = try CurlShare.init(allocator);
errdefer self.curl_share.deinit();
return self;
}
/// Create SharedState by borrowing V8 resources from an existing App.
/// Use this when App is already initialized (e.g., in tests).
pub fn initFromApp(app: *App, allocator: Allocator) !*SharedState {
const self = try allocator.create(SharedState);
errdefer allocator.destroy(self);
self.allocator = allocator;
self.arena = ArenaAllocator.init(allocator);
errdefer self.arena.deinit();
// Borrow V8 resources from App (don't initialize new ones)
self.platform = app.platform;
self.snapshot = app.snapshot;
self.owns_v8_resources = false;
// Initialize notification hub
self.notification = try Notification.init(allocator, app.notification);
errdefer self.notification.deinit();
// Build HTTP options from App config
const config = app.config;
const arena_alloc = self.arena.allocator();
var adjusted_opts = Http.Opts{
.max_host_open = config.http_max_host_open orelse 4,
.max_concurrent = config.http_max_concurrent orelse 10,
.timeout_ms = config.http_timeout_ms orelse 5000,
.connect_timeout_ms = config.http_connect_timeout_ms orelse 0,
.http_proxy = config.http_proxy,
.tls_verify_host = config.tls_verify_host,
.proxy_bearer_token = config.proxy_bearer_token,
.user_agent = config.user_agent,
};
if (config.proxy_bearer_token) |bt| {
adjusted_opts.proxy_bearer_token = try std.fmt.allocPrintSentinel(arena_alloc, "Proxy-Authorization: Bearer {s}", .{bt}, 0);
}
self.http_opts = adjusted_opts;
// Load TLS certificates
if (config.tls_verify_host) {
self.ca_blob = try loadCerts(allocator, arena_alloc);
} else {
self.ca_blob = null;
}
// Initialize curl share handle for shared resources
self.curl_share = try CurlShare.init(allocator);
errdefer self.curl_share.deinit();
return self;
}
pub fn deinit(self: *SharedState) void {
const allocator = self.allocator;
self.notification.deinit();
self.curl_share.deinit();
// Only cleanup V8 resources if we own them
if (self.owns_v8_resources) {
self.snapshot.deinit();
self.platform.deinit();
}
self.arena.deinit();
allocator.destroy(self);
}
/// Create a new HTTP client for a session thread.
/// The client will use the shared curl_share for DNS, TLS, and connection pooling.
pub fn createHttpClient(self: *SharedState, session_allocator: Allocator) !*HttpClient {
return HttpClient.init(
session_allocator,
self.ca_blob,
self.http_opts,
self.curl_share.getHandle(),
);
}
// Adapted from Http.zig
fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator);
defer bundle.deinit(allocator);
const bytes = bundle.bytes.items;
if (bytes.len == 0) {
log.warn(.app, "No system certificates", .{});
return .{
.len = 0,
.flags = 0,
.data = bytes.ptr,
};
}
const encoder = std.base64.standard.Encoder;
var arr: std.ArrayListUnmanaged(u8) = .empty;
const encoded_size = encoder.calcSize(bytes.len);
const buffer_size = encoded_size +
(bundle.map.count() * 75) +
(encoded_size / 64);
try arr.ensureTotalCapacity(arena, buffer_size);
var writer = arr.writer(arena);
var it = bundle.map.valueIterator();
while (it.next()) |index| {
const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*);
try writer.writeAll("-----BEGIN CERTIFICATE-----\n");
var line_writer = LineWriter{ .inner = writer };
try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]);
try writer.writeAll("\n-----END CERTIFICATE-----\n");
}
return .{
.len = arr.items.len,
.data = arr.items.ptr,
.flags = 0,
};
}
const LineWriter = struct {
col: usize = 0,
inner: std.ArrayListUnmanaged(u8).Writer,
pub fn writeAll(self: *LineWriter, data: []const u8) !void {
var lwriter = self.inner;
var col = self.col;
const len = 64 - col;
var remain = data;
if (remain.len > len) {
col = 0;
try lwriter.writeAll(data[0..len]);
try lwriter.writeByte('\n');
remain = data[len..];
}
while (remain.len > 64) {
try lwriter.writeAll(remain[0..64]);
try lwriter.writeByte('\n');
remain = data[len..];
}
try lwriter.writeAll(remain);
self.col = col + remain.len;
}
};

View File

@@ -24,6 +24,7 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const js = @import("js/js.zig"); const js = @import("js/js.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const SharedState = @import("../SharedState.zig");
const HttpClient = @import("../http/Client.zig"); const HttpClient = @import("../http/Client.zig");
const Notification = @import("../Notification.zig"); const Notification = @import("../Notification.zig");
@@ -37,7 +38,8 @@ const Session = @import("Session.zig");
const Browser = @This(); const Browser = @This();
env: js.Env, env: js.Env,
app: *App, shared: ?*SharedState,
app: ?*App,
session: ?Session, session: ?Session,
allocator: Allocator, allocator: Allocator,
http_client: *HttpClient, http_client: *HttpClient,
@@ -47,7 +49,33 @@ session_arena: ArenaAllocator,
transfer_arena: ArenaAllocator, transfer_arena: ArenaAllocator,
notification: *Notification, notification: *Notification,
pub fn init(app: *App) !Browser { /// Initialize a Browser with SharedState (for multi-session CDP mode)
pub fn init(shared: *SharedState, http_client: *HttpClient, allocator: Allocator) !Browser {
var env = try js.Env.init(allocator, &shared.platform, &shared.snapshot);
errdefer env.deinit();
const notification = try Notification.init(allocator, shared.notification);
http_client.notification = notification;
http_client.next_request_id = 0; // Should we track ids in CDP only?
errdefer notification.deinit();
return .{
.shared = shared,
.app = null,
.env = env,
.session = null,
.allocator = allocator,
.notification = notification,
.http_client = http_client,
.call_arena = ArenaAllocator.init(allocator),
.page_arena = ArenaAllocator.init(allocator),
.session_arena = ArenaAllocator.init(allocator),
.transfer_arena = ArenaAllocator.init(allocator),
};
}
/// Initialize a Browser with App (for single-session mode like fetch)
pub fn initFromApp(app: *App) !Browser {
const allocator = app.allocator; const allocator = app.allocator;
var env = try js.Env.init(allocator, &app.platform, &app.snapshot); var env = try js.Env.init(allocator, &app.platform, &app.snapshot);
@@ -55,10 +83,11 @@ pub fn init(app: *App) !Browser {
const notification = try Notification.init(allocator, app.notification); const notification = try Notification.init(allocator, app.notification);
app.http.client.notification = notification; app.http.client.notification = notification;
app.http.client.next_request_id = 0; // Should we track ids in CDP only? app.http.client.next_request_id = 0;
errdefer notification.deinit(); errdefer notification.deinit();
return .{ return .{
.shared = null,
.app = app, .app = app,
.env = env, .env = env,
.session = null, .session = null,

View File

@@ -66,7 +66,7 @@ pub fn init(self: *Session, browser: *Browser) !void {
var executor = try browser.env.newExecutionWorld(); var executor = try browser.env.newExecutionWorld();
errdefer executor.deinit(); errdefer executor.deinit();
const allocator = browser.app.allocator; const allocator = browser.allocator;
const session_allocator = browser.session_arena.allocator(); const session_allocator = browser.session_arena.allocator();
self.* = .{ self.* = .{
@@ -86,7 +86,7 @@ pub fn deinit(self: *Session) void {
self.removePage(); self.removePage();
} }
self.cookie_jar.deinit(); self.cookie_jar.deinit();
self.storage_shed.deinit(self.browser.app.allocator); self.storage_shed.deinit(self.browser.allocator);
self.executor.deinit(); self.executor.deinit();
} }

View File

@@ -27,7 +27,15 @@ _pad: bool = false,
pub const init: Navigator = .{}; pub const init: Navigator = .{};
pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 { pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 {
return page._session.browser.app.config.user_agent; const browser = page._session.browser;
// Handle both modes: SharedState (multi-session) or App (single-session)
if (browser.shared) |shared| {
return shared.http_opts.user_agent;
} else if (browser.app) |app| {
return app.config.user_agent;
} else {
return "Lightpanda/1.0";
}
} }
pub fn getAppName(_: *const Navigator) []const u8 { pub fn getAppName(_: *const Navigator) []const u8 {

View File

@@ -25,7 +25,8 @@ const json = std.json;
const log = @import("../log.zig"); const log = @import("../log.zig");
const js = @import("../browser/js/js.zig"); const js = @import("../browser/js/js.zig");
const App = @import("../App.zig"); const SharedState = @import("../SharedState.zig");
const HttpClient = @import("../http/Client.zig");
const Browser = @import("../browser/Browser.zig"); const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig"); const Session = @import("../browser/Session.zig");
const Page = @import("../browser/Page.zig"); const Page = @import("../browser/Page.zig");
@@ -78,9 +79,25 @@ pub fn CDPT(comptime TypeProvider: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(shared: *SharedState, http_client: *HttpClient, client: TypeProvider.Client) !Self {
const allocator = shared.allocator;
const browser = try Browser.init(shared, http_client, allocator);
errdefer browser.deinit();
return .{
.client = client,
.browser = browser,
.allocator = allocator,
.browser_context = null,
.message_arena = std.heap.ArenaAllocator.init(allocator),
.notification_arena = std.heap.ArenaAllocator.init(allocator),
};
}
/// Initialize CDP with App (for testing and single-session mode)
pub fn initFromApp(app: *lp.App, client: TypeProvider.Client) !Self {
const allocator = app.allocator; const allocator = app.allocator;
const browser = try Browser.init(app); const browser = try Browser.initFromApp(app);
errdefer browser.deinit(); errdefer browser.deinit();
return .{ return .{

View File

@@ -84,7 +84,8 @@ const TestContext = struct {
self.client = Client.init(self.arena.allocator()); self.client = Client.init(self.arena.allocator());
// Don't use the arena here. We want to detect leaks in CDP. // Don't use the arena here. We want to detect leaks in CDP.
// The arena is only for test-specific stuff // The arena is only for test-specific stuff
self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; // Use test_app from base testing (reuses existing V8 platform)
self.cdp_ = TestCDP.initFromApp(base.test_app, &self.client.?) catch unreachable;
} }
return &self.cdp_.?; return &self.cdp_.?;
} }

View File

@@ -124,7 +124,7 @@ pub const CDPClient = struct {
const TransferQueue = std.DoublyLinkedList; const TransferQueue = std.DoublyLinkedList;
pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts) !*Client { pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts, share_handle: ?*c.CURLSH) !*Client {
var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator); var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator);
errdefer transfer_pool.deinit(); errdefer transfer_pool.deinit();
@@ -136,7 +136,7 @@ pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts) !*Clie
try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, opts.max_host_open))); try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, opts.max_host_open)));
var handles = try Handles.init(allocator, client, ca_blob, &opts); var handles = try Handles.init(allocator, client, ca_blob, &opts, share_handle);
errdefer handles.deinit(allocator); errdefer handles.deinit(allocator);
client.* = .{ client.* = .{
@@ -650,7 +650,7 @@ const Handles = struct {
const HandleList = std.DoublyLinkedList; const HandleList = std.DoublyLinkedList;
// pointer to opts is not stable, don't hold a reference to it! // pointer to opts is not stable, don't hold a reference to it!
fn init(allocator: Allocator, client: *Client, ca_blob: ?c.curl_blob, opts: *const Http.Opts) !Handles { fn init(allocator: Allocator, client: *Client, ca_blob: ?c.curl_blob, opts: *const Http.Opts, share_handle: ?*c.CURLSH) !Handles {
const count = if (opts.max_concurrent == 0) 1 else opts.max_concurrent; const count = if (opts.max_concurrent == 0) 1 else opts.max_concurrent;
const handles = try allocator.alloc(Handle, count); const handles = try allocator.alloc(Handle, count);
@@ -658,7 +658,7 @@ const Handles = struct {
var available: HandleList = .{}; var available: HandleList = .{};
for (0..count) |i| { for (0..count) |i| {
handles[i] = try Handle.init(client, ca_blob, opts); handles[i] = try Handle.init(client, ca_blob, opts, share_handle);
available.append(&handles[i].node); available.append(&handles[i].node);
} }
@@ -706,12 +706,17 @@ pub const Handle = struct {
node: Handles.HandleList.Node, node: Handles.HandleList.Node,
// pointer to opts is not stable, don't hold a reference to it! // pointer to opts is not stable, don't hold a reference to it!
fn init(client: *Client, ca_blob: ?c.curl_blob, opts: *const Http.Opts) !Handle { fn init(client: *Client, ca_blob: ?c.curl_blob, opts: *const Http.Opts, share_handle: ?*c.CURLSH) !Handle {
const conn = try Http.Connection.init(ca_blob, opts); const conn = try Http.Connection.init(ca_blob, opts);
errdefer conn.deinit(); errdefer conn.deinit();
const easy = conn.easy; const easy = conn.easy;
// Configure shared resources (DNS cache, TLS sessions, connections)
if (share_handle) |sh| {
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_SHARE, sh));
}
// callbacks // callbacks
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_HEADERDATA, easy)); try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_HEADERDATA, easy));
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_HEADERFUNCTION, Transfer.headerCallback)); try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_HEADERFUNCTION, Transfer.headerCallback));

108
src/http/CurlShare.zig Normal file
View File

@@ -0,0 +1,108 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const Http = @import("Http.zig");
const c = Http.c;
/// Thread-safe wrapper for libcurl's share handle.
/// Allows multiple CURLM handles (one per session thread) to share:
/// - DNS resolution cache
/// - TLS session resumption data
/// - Connection pool
const CurlShare = @This();
handle: *c.CURLSH,
dns_mutex: std.Thread.Mutex,
ssl_mutex: std.Thread.Mutex,
conn_mutex: std.Thread.Mutex,
allocator: Allocator,
pub fn init(allocator: Allocator) !*CurlShare {
const share = try allocator.create(CurlShare);
errdefer allocator.destroy(share);
const handle = c.curl_share_init() orelse return error.FailedToInitializeShare;
errdefer _ = c.curl_share_cleanup(handle);
share.* = .{
.handle = handle,
.dns_mutex = .{},
.ssl_mutex = .{},
.conn_mutex = .{},
.allocator = allocator,
};
// Set up lock/unlock callbacks
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_LOCKFUNC, @as(?*const fn (?*c.CURL, c.curl_lock_data, c.curl_lock_access, ?*anyopaque) callconv(.c) void, &lockFunc)));
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_UNLOCKFUNC, @as(?*const fn (?*c.CURL, c.curl_lock_data, ?*anyopaque) callconv(.c) void, &unlockFunc)));
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_USERDATA, @as(?*anyopaque, share)));
// Configure what data to share
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_SHARE, c.CURL_LOCK_DATA_DNS));
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_SHARE, c.CURL_LOCK_DATA_SSL_SESSION));
try errorSHCheck(c.curl_share_setopt(handle, c.CURLSHOPT_SHARE, c.CURL_LOCK_DATA_CONNECT));
return share;
}
pub fn deinit(self: *CurlShare) void {
_ = c.curl_share_cleanup(self.handle);
self.allocator.destroy(self);
}
pub fn getHandle(self: *CurlShare) *c.CURLSH {
return self.handle;
}
fn lockFunc(_: ?*c.CURL, data: c.curl_lock_data, _: c.curl_lock_access, userptr: ?*anyopaque) callconv(.c) void {
const self: *CurlShare = @ptrCast(@alignCast(userptr));
const mutex = self.getMutex(data) orelse return;
mutex.lock();
}
fn unlockFunc(_: ?*c.CURL, data: c.curl_lock_data, userptr: ?*anyopaque) callconv(.c) void {
const self: *CurlShare = @ptrCast(@alignCast(userptr));
const mutex = self.getMutex(data) orelse return;
mutex.unlock();
}
fn getMutex(self: *CurlShare, data: c.curl_lock_data) ?*std.Thread.Mutex {
return switch (data) {
c.CURL_LOCK_DATA_DNS => &self.dns_mutex,
c.CURL_LOCK_DATA_SSL_SESSION => &self.ssl_mutex,
c.CURL_LOCK_DATA_CONNECT => &self.conn_mutex,
else => null,
};
}
fn errorSHCheck(code: c.CURLSHcode) !void {
if (code == c.CURLSHE_OK) {
return;
}
return switch (code) {
c.CURLSHE_BAD_OPTION => error.ShareBadOption,
c.CURLSHE_IN_USE => error.ShareInUse,
c.CURLSHE_INVALID => error.ShareInvalid,
c.CURLSHE_NOMEM => error.OutOfMemory,
c.CURLSHE_NOT_BUILT_IN => error.ShareNotBuiltIn,
else => error.ShareUnknown,
};
}

View File

@@ -66,7 +66,7 @@ pub fn init(allocator: Allocator, opts: Opts) !Http {
ca_blob = try loadCerts(allocator, arena.allocator()); ca_blob = try loadCerts(allocator, arena.allocator());
} }
var client = try Client.init(allocator, ca_blob, adjusted_opts); var client = try Client.init(allocator, ca_blob, adjusted_opts, null);
errdefer client.deinit(); errdefer client.deinit();
return .{ return .{

View File

@@ -19,6 +19,9 @@
const std = @import("std"); const std = @import("std");
pub const App = @import("App.zig"); pub const App = @import("App.zig");
pub const Server = @import("Server.zig"); pub const Server = @import("Server.zig");
pub const SharedState = @import("SharedState.zig");
pub const SessionThread = @import("SessionThread.zig");
pub const SessionManager = @import("SessionManager.zig");
pub const Page = @import("browser/Page.zig"); pub const Page = @import("browser/Page.zig");
pub const Browser = @import("browser/Browser.zig"); pub const Browser = @import("browser/Browser.zig");
pub const Session = @import("browser/Session.zig"); pub const Session = @import("browser/Session.zig");
@@ -37,7 +40,7 @@ pub const FetchOpts = struct {
writer: ?*std.Io.Writer = null, writer: ?*std.Io.Writer = null,
}; };
pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void {
var browser = try Browser.init(app); var browser = try Browser.initFromApp(app);
defer browser.deinit(); defer browser.deinit();
var session = try browser.newSession(); var session = try browser.newSession();

View File

@@ -108,8 +108,12 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
return args.printUsageAndExit(false); return args.printUsageAndExit(false);
}; };
// Create SharedState for multi-session CDP server
const shared = try app.createSharedState();
defer shared.deinit();
// _server is global to handle graceful shutdown. // _server is global to handle graceful shutdown.
var server = try lp.Server.init(app, address); var server = try lp.Server.init(shared, address, app.config.max_sessions, app.config.session_memory_limit);
defer server.deinit(); defer server.deinit();
try sighandler.on(lp.Server.stop, .{&server}); try sighandler.on(lp.Server.stop, .{&server});

View File

@@ -43,7 +43,7 @@ pub fn main() !void {
var test_arena = std.heap.ArenaAllocator.init(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit(); defer test_arena.deinit();
var browser = try lp.Browser.init(app); var browser = try lp.Browser.initFromApp(app);
defer browser.deinit(); defer browser.deinit();
const session = try browser.newSession(); const session = try browser.newSession();

View File

@@ -65,7 +65,7 @@ pub fn main() !void {
}); });
defer app.deinit(); defer app.deinit();
var browser = try lp.Browser.init(app); var browser = try lp.Browser.initFromApp(app);
defer browser.deinit(); defer browser.deinit();
// An arena for running each tests. Is reset after every test. // An arena for running each tests. Is reset after every test.

View File

@@ -441,8 +441,10 @@ const log = @import("log.zig");
const TestHTTPServer = @import("TestHTTPServer.zig"); const TestHTTPServer = @import("TestHTTPServer.zig");
const Server = @import("Server.zig"); const Server = @import("Server.zig");
const SharedState = @import("SharedState.zig");
var test_cdp_server: ?Server = null; var test_cdp_server: ?Server = null;
var test_http_server: ?TestHTTPServer = null; var test_http_server: ?TestHTTPServer = null;
var test_shared_state: ?*SharedState = null;
test "tests:beforeAll" { test "tests:beforeAll" {
log.opts.level = .warn; log.opts.level = .warn;
@@ -455,7 +457,7 @@ test "tests:beforeAll" {
}); });
errdefer test_app.deinit(); errdefer test_app.deinit();
test_browser = try Browser.init(test_app); test_browser = try Browser.initFromApp(test_app);
errdefer test_browser.deinit(); errdefer test_browser.deinit();
test_session = try test_browser.newSession(); test_session = try test_browser.newSession();
@@ -483,6 +485,10 @@ test "tests:afterAll" {
if (test_cdp_server) |*server| { if (test_cdp_server) |*server| {
server.deinit(); server.deinit();
} }
if (test_shared_state) |shared| {
shared.deinit();
test_shared_state = null;
}
if (test_http_server) |*server| { if (test_http_server) |*server| {
server.deinit(); server.deinit();
} }
@@ -495,13 +501,16 @@ test "tests:afterAll" {
fn serveCDP(wg: *std.Thread.WaitGroup) !void { fn serveCDP(wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9583); const address = try std.net.Address.parseIp("127.0.0.1", 9583);
test_cdp_server = try Server.init(test_app, address);
var server = try Server.init(test_app, address); // Create SharedState by borrowing V8 resources from test_app
test_shared_state = try SharedState.initFromApp(test_app, @import("root").tracking_allocator);
var server = try Server.init(test_shared_state.?, address, 10, 64 * 1024 * 1024);
test_cdp_server = server;
defer server.deinit(); defer server.deinit();
wg.finish(); wg.finish();
test_cdp_server.?.run(address, 5) catch |err| { server.run(address, 5) catch |err| {
std.debug.print("CDP server error: {}", .{err}); std.debug.print("CDP server error: {}", .{err});
return err; return err;
}; };