Run some cdp connections

This commit is contained in:
Nikolay Govorov
2026-01-26 07:30:15 +00:00
parent 0764a44e1d
commit dc7ce0db89
21 changed files with 756 additions and 276 deletions

View File

@@ -27,17 +27,17 @@ const Platform = @import("browser/js/Platform.zig");
const Telemetry = @import("telemetry/telemetry.zig").Telemetry; const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
pub const Http = @import("http/Http.zig"); pub const Http = @import("http/Http.zig");
pub const Network = Http.Network;
pub const ArenaPool = @import("ArenaPool.zig"); pub const ArenaPool = @import("ArenaPool.zig");
pub const Notification = @import("Notification.zig"); pub const Notification = @import("Notification.zig");
const App = @This(); const App = @This();
http: Http,
config: *const Config, config: *const Config,
network: Network,
platform: Platform, platform: Platform,
snapshot: Snapshot, snapshot: Snapshot,
telemetry: Telemetry, telemetry: Telemetry,
allocator: Allocator,
arena_pool: ArenaPool, arena_pool: ArenaPool,
app_dir_path: ?[]const u8, app_dir_path: ?[]const u8,
notification: *Notification, notification: *Notification,
@@ -48,14 +48,13 @@ pub fn init(allocator: Allocator, config: *const Config) !*App {
errdefer allocator.destroy(app); errdefer allocator.destroy(app);
app.config = config; app.config = config;
app.allocator = allocator;
app.network = try Network.init(allocator, config);
errdefer app.network.deinit();
app.notification = try Notification.init(allocator, null); app.notification = try Notification.init(allocator, null);
errdefer app.notification.deinit(); errdefer app.notification.deinit();
app.http = try Http.init(allocator, config);
errdefer app.http.deinit();
app.platform = try Platform.init(); app.platform = try Platform.init();
errdefer app.platform.deinit(); errdefer app.platform.deinit();
@@ -64,7 +63,7 @@ pub fn init(allocator: Allocator, config: *const Config) !*App {
app.app_dir_path = getAndMakeAppDir(allocator); app.app_dir_path = getAndMakeAppDir(allocator);
app.telemetry = try Telemetry.init(app, config.mode); app.telemetry = try Telemetry.init(allocator, app, config.mode);
errdefer app.telemetry.deinit(); errdefer app.telemetry.deinit();
try app.telemetry.register(app.notification); try app.telemetry.register(app.notification);
@@ -75,22 +74,21 @@ pub fn init(allocator: Allocator, config: *const Config) !*App {
return app; return app;
} }
pub fn deinit(self: *App) void { pub fn deinit(self: *App, allocator: Allocator) void {
if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) {
return; return;
} }
const allocator = self.allocator;
if (self.app_dir_path) |app_dir_path| { if (self.app_dir_path) |app_dir_path| {
allocator.free(app_dir_path); allocator.free(app_dir_path);
self.app_dir_path = null; self.app_dir_path = null;
} }
self.telemetry.deinit(); self.telemetry.deinit();
self.notification.deinit(); self.notification.deinit();
self.http.deinit();
self.snapshot.deinit(); self.snapshot.deinit();
self.platform.deinit(); self.platform.deinit();
self.arena_pool.deinit(); self.arena_pool.deinit();
self.network.deinit();
allocator.destroy(self); allocator.destroy(self);
} }

View File

@@ -29,6 +29,7 @@ free_list_len: u16 = 0,
free_list: ?*Entry = null, free_list: ?*Entry = null,
free_list_max: u16, free_list_max: u16,
entry_pool: std.heap.MemoryPool(Entry), entry_pool: std.heap.MemoryPool(Entry),
mutex: std.Thread.Mutex = .{},
const Entry = struct { const Entry = struct {
next: ?*Entry, next: ?*Entry,
@@ -41,6 +42,7 @@ pub fn init(allocator: Allocator) ArenaPool {
.free_list_max = 512, // TODO make configurable .free_list_max = 512, // TODO make configurable
.retain_bytes = 1024 * 16, // TODO make configurable .retain_bytes = 1024 * 16, // TODO make configurable
.entry_pool = std.heap.MemoryPool(Entry).init(allocator), .entry_pool = std.heap.MemoryPool(Entry).init(allocator),
.mutex = .{},
}; };
} }
@@ -54,6 +56,9 @@ pub fn deinit(self: *ArenaPool) void {
} }
pub fn acquire(self: *ArenaPool) !Allocator { pub fn acquire(self: *ArenaPool) !Allocator {
self.mutex.lock();
defer self.mutex.unlock();
if (self.free_list) |entry| { if (self.free_list) |entry| {
self.free_list = entry.next; self.free_list = entry.next;
return entry.arena.allocator(); return entry.arena.allocator();
@@ -72,13 +77,18 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void {
const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr));
const entry: *Entry = @fieldParentPtr("arena", arena); const entry: *Entry = @fieldParentPtr("arena", arena);
// Reset the arena before acquiring the lock to minimize lock hold time
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
self.mutex.lock();
defer self.mutex.unlock();
if (self.free_list_len == self.free_list_max) { if (self.free_list_len == self.free_list_max) {
arena.deinit(); arena.deinit();
self.entry_pool.destroy(entry); self.entry_pool.destroy(entry);
return; return;
} }
_ = arena.reset(.{ .retain_with_limit = self.retain_bytes });
entry.next = self.free_list; entry.next = self.free_list;
self.free_list = entry; self.free_list = entry;
} }

View File

@@ -30,6 +30,13 @@ pub const RunMode = enum {
version, version,
}; };
pub const MAX_HTTP_REQUEST_SIZE = 4096;
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
pub const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
mode: Mode, mode: Mode,
exec_name: []const u8, exec_name: []const u8,
@@ -116,12 +123,26 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 {
}; };
} }
pub fn maxConnections(self: *const Config) u16 {
return switch (self.mode) {
.serve => |opts| opts.max_connections,
else => unreachable,
};
}
pub fn maxMemoryPerTab(self: *const Config) usize {
return switch (self.mode) {
.serve => |opts| @intCast(opts.max_memory_per_tab),
else => unreachable,
};
}
pub fn userAgent(self: *const Config, allocator: Allocator) ![:0]const u8 { pub fn userAgent(self: *const Config, allocator: Allocator) ![:0]const u8 {
const base = "User-Agent: Lightpanda/1.0"; const base = "User-Agent: Lightpanda/1.0";
if (self.userAgentSuffix()) |suffix| { if (self.userAgentSuffix()) |suffix| {
return try std.fmt.allocPrintSentinel(allocator, "{s} {s}", .{ base, suffix }, 0); return try std.fmt.allocPrintSentinel(allocator, "{s} {s}", .{ base, suffix }, 0);
} }
return base; return try allocator.dupeZ(u8, base);
} }
pub const Mode = union(RunMode) { pub const Mode = union(RunMode) {

75
src/LimitedAllocator.zig Normal file
View File

@@ -0,0 +1,75 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const LimitedAllocator = @This();
parent: Allocator,
limit: usize,
allocated: usize = 0,
pub fn init(parent: Allocator, limit: usize) LimitedAllocator {
return .{ .parent = parent, .limit = limit };
}
pub fn allocator(self: *LimitedAllocator) Allocator {
return .{ .ptr = self, .vtable = &vtable };
}
const vtable: Allocator.VTable = .{
.alloc = alloc,
.resize = resize,
.remap = remap,
.free = free,
};
fn alloc(ctx: *anyopaque, len: usize, alignment: std.mem.Alignment, ret_addr: usize) ?[*]u8 {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (self.allocated + len > self.limit) return null;
const result = self.parent.rawAlloc(len, alignment, ret_addr);
if (result != null) self.allocated += len;
return result;
}
fn resize(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) bool {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (new_len > memory.len and self.allocated + new_len - memory.len > self.limit) return false;
if (self.parent.rawResize(memory, alignment, new_len, ret_addr)) {
if (new_len > memory.len) self.allocated += new_len - memory.len else self.allocated -= memory.len - new_len;
return true;
}
return false;
}
fn remap(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) ?[*]u8 {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
if (new_len > memory.len and self.allocated + new_len - memory.len > self.limit) return null;
const result = self.parent.rawRemap(memory, alignment, new_len, ret_addr);
if (result != null) {
if (new_len > memory.len) self.allocated += new_len - memory.len else self.allocated -= memory.len - new_len;
}
return result;
}
fn free(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, ret_addr: usize) void {
const self: *LimitedAllocator = @ptrCast(@alignCast(ctx));
self.parent.rawFree(memory, alignment, ret_addr);
self.allocated -= memory.len;
}

View File

@@ -28,34 +28,31 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const log = @import("log.zig"); const log = @import("log.zig");
const App = @import("App.zig"); const App = @import("App.zig");
const Config = @import("Config.zig");
const CDP = @import("cdp/cdp.zig").CDP; const CDP = @import("cdp/cdp.zig").CDP;
const Http = App.Http;
const MAX_HTTP_REQUEST_SIZE = 4096; const ThreadPool = @import("ThreadPool.zig");
const LimitedAllocator = @import("LimitedAllocator.zig");
// max message size
// +14 for max websocket payload overhead
// +140 for the max control packet that might be interleaved in a message
const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140;
const Server = @This(); const Server = @This();
app: *App, app: *App,
shutdown: bool = false, shutdown: bool = false,
allocator: Allocator, allocator: Allocator,
client: ?posix.socket_t,
listener: ?posix.socket_t, listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
thread_pool: ThreadPool,
pub fn init(app: *App, address: net.Address) !Server { pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server {
const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address); const json_version_response = try buildJSONVersionResponse(allocator, address);
errdefer allocator.free(json_version_response); errdefer allocator.free(json_version_response);
return .{ return .{
.app = app, .app = app,
.client = null,
.listener = null, .listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
.thread_pool = ThreadPool.init(allocator, app.config.maxConnections()),
}; };
} }
@@ -81,12 +78,11 @@ pub fn stop(self: *Server) void {
} }
pub fn deinit(self: *Server) void { pub fn deinit(self: *Server) void {
self.thread_pool.deinit();
if (self.listener) |listener| { if (self.listener) |listener| {
posix.close(listener); posix.close(listener);
self.listener = null; self.listener = null;
} }
// *if* server.run is running, we should really wait for it to return
// before existing from here.
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
} }
@@ -119,12 +115,81 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
}; };
self.client = socket; self.thread_pool.spawn(handleConnection, .{ self, socket, timeout_ms }, shutdownConnection, .{socket}) catch |err| {
defer if (self.client) |s| { log.err(.app, "CDP spawn", .{ .err = err });
posix.close(s); posix.close(socket);
self.client = null;
}; };
}
}
fn shutdownConnection(socket: posix.socket_t) void {
posix.shutdown(socket, .recv) catch {};
}
fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer posix.close(socket);
var limited = LimitedAllocator.init(self.allocator, self.app.config.maxMemoryPerTab());
const client_allocator = limited.allocator();
// Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = client_allocator.create(Client) catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer client_allocator.destroy(client);
client.* = Client.init(
socket,
client_allocator,
self.app,
self.json_version_response,
timeout_ms,
) catch |err| {
log.err(.app, "CDP client init", .{ .err = err });
return;
};
defer client.deinit();
client.run();
}
// Handle exactly one TCP connection.
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
allocator: Allocator,
app: *App,
http: Http,
json_version_response: []const u8,
reader: Reader(true),
socket: posix.socket_t,
socket_flags: usize,
send_arena: ArenaAllocator,
timeout_ms: u32,
const EMPTY_PONG = [_]u8{ 138, 0 };
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(
socket: posix.socket_t,
allocator: Allocator,
app: *App,
json_version_response: []const u8,
timeout_ms: u32,
) !Client {
if (log.enabled(.app, .info)) { if (log.enabled(.app, .info)) {
var client_address: std.net.Address = undefined; var client_address: std.net.Address = undefined;
var socklen: posix.socklen_t = @sizeOf(net.Address); var socklen: posix.socklen_t = @sizeOf(net.Address);
@@ -132,70 +197,101 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
log.info(.app, "client connected", .{ .ip = client_address }); log.info(.app, "client connected", .{ .ip = client_address });
} }
self.readLoop(socket, timeout_ms) catch |err| { const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
log.err(.app, "CDP client loop", .{ .err = err }); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(allocator);
errdefer reader.deinit();
var http = try app.network.createHttp(allocator);
errdefer http.deinit();
return .{
.socket = socket,
.allocator = allocator,
.app = app,
.http = http,
.json_version_response = json_version_response,
.reader = reader,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(allocator),
.timeout_ms = timeout_ms,
}; };
} }
fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
self.reader.deinit();
self.send_arena.deinit();
self.http.deinit();
} }
fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { fn run(self: *Client) void {
// This shouldn't be necessary, but the Client is HUGE (> 512KB) because var http = &self.http;
// it has a large read buffer. I don't know why, but v8 crashes if this
// is on the stack (and I assume it's related to its size).
const client = try self.allocator.create(Client);
defer self.allocator.destroy(client);
client.* = try Client.init(socket, self);
defer client.deinit();
var http = &self.app.http;
http.addCDPClient(.{ http.addCDPClient(.{
.socket = socket, .socket = self.socket,
.ctx = client, .ctx = self,
.blocking_read_start = Client.blockingReadStart, .blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead, .blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop, .blocking_read_end = Client.blockingReadStop,
}); });
defer http.removeCDPClient(); defer http.removeCDPClient();
lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); self.httpLoop(http) catch |err| {
log.err(.app, "CDP client loop", .{ .err = err });
};
}
fn httpLoop(self: *Client, http: anytype) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
while (true) { while (true) {
if (http.poll(timeout_ms) != .cdp_socket) { if (http.poll(self.timeout_ms) != .cdp_socket) {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
if (client.mode == .cdp) { if (self.mode == .cdp) {
break; // switch to our CDP loop break;
} }
} }
var cdp = &client.mode.cdp; return self.cdpLoop(http);
}
fn cdpLoop(self: *Client, http: anytype) !void {
var cdp = &self.mode.cdp;
var last_message = timestamp(.monotonic); var last_message = timestamp(.monotonic);
var ms_remaining = timeout_ms; var ms_remaining = self.timeout_ms;
while (true) { while (true) {
switch (cdp.pageWait(ms_remaining)) { switch (cdp.pageWait(ms_remaining)) {
.cdp_socket => { .cdp_socket => {
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
last_message = timestamp(.monotonic); last_message = timestamp(.monotonic);
ms_remaining = timeout_ms; ms_remaining = self.timeout_ms;
}, },
.no_page => { .no_page => {
if (http.poll(ms_remaining) != .cdp_socket) { if (http.poll(ms_remaining) != .cdp_socket) {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
if (client.readSocket() == false) { if (self.readSocket() == false) {
return; return;
} }
last_message = timestamp(.monotonic); last_message = timestamp(.monotonic);
ms_remaining = timeout_ms; ms_remaining = self.timeout_ms;
}, },
.done => { .done => {
const elapsed = timestamp(.monotonic) - last_message; const elapsed = timestamp(.monotonic) - last_message;
@@ -210,57 +306,6 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
} }
} }
pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances
// should eventually be upgraded to a websocket connections
mode: union(enum) {
http: void,
cdp: CDP,
},
server: *Server,
reader: Reader(true),
socket: posix.socket_t,
socket_flags: usize,
send_arena: ArenaAllocator,
const EMPTY_PONG = [_]u8{ 138, 0 };
// CLOSE, 2 length, code
const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000
const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009
const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002
// "private-use" close codes must be from 4000-49999
const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000
fn init(socket: posix.socket_t, server: *Server) !Client {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
// we expect the socket to come to us as nonblocking
lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{});
var reader = try Reader(true).init(server.allocator);
errdefer reader.deinit();
return .{
.socket = socket,
.server = server,
.reader = reader,
.mode = .{ .http = {} },
.socket_flags = socket_flags,
.send_arena = ArenaAllocator.init(server.allocator),
};
}
fn deinit(self: *Client) void {
switch (self.mode) {
.cdp => |*cdp| cdp.deinit(),
.http => {},
}
self.reader.deinit();
self.send_arena.deinit();
}
fn blockingReadStart(ctx: *anyopaque) bool { fn blockingReadStart(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx)); const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| { _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| {
@@ -315,7 +360,7 @@ pub const Client = struct {
lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len]; const request = self.reader.buf[0..self.reader.len];
if (request.len > MAX_HTTP_REQUEST_SIZE) { if (request.len > Config.MAX_HTTP_REQUEST_SIZE) {
self.writeHTTPErrorResponse(413, "Request too large"); self.writeHTTPErrorResponse(413, "Request too large");
return error.RequestTooLarge; return error.RequestTooLarge;
} }
@@ -368,7 +413,7 @@ pub const Client = struct {
} }
if (std.mem.eql(u8, url, "/json/version")) { if (std.mem.eql(u8, url, "/json/version")) {
try self.send(self.server.json_version_response); try self.send(self.json_version_response);
// Chromedp (a Go driver) does an http request to /json/version // Chromedp (a Go driver) does an http request to /json/version
// then to / (websocket upgrade) using a different connection. // then to / (websocket upgrade) using a different connection.
// Since we only allow 1 connection at a time, the 2nd one (the // Since we only allow 1 connection at a time, the 2nd one (the
@@ -473,7 +518,7 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; self.mode = .{ .cdp = try CDP.init(self.allocator, self.app, &self.http, self) };
return self.send(response); return self.send(response);
} }
@@ -708,7 +753,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (message_len > 125) { if (message_len > 125) {
return error.ControlTooLarge; return error.ControlTooLarge;
} }
} else if (message_len > MAX_MESSAGE_SIZE) { } else if (message_len > Config.MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} else if (message_len > self.buf.len) { } else if (message_len > self.buf.len) {
const len = self.buf.len; const len = self.buf.len;
@@ -736,7 +781,7 @@ fn Reader(comptime EXPECT_MASK: bool) type {
if (is_continuation) { if (is_continuation) {
const fragments = &(self.fragments orelse return error.InvalidContinuation); const fragments = &(self.fragments orelse return error.InvalidContinuation);
if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { if (fragments.message.items.len + message_len > Config.MAX_MESSAGE_SIZE) {
return error.TooLarge; return error.TooLarge;
} }

257
src/ThreadPool.zig Normal file
View File

@@ -0,0 +1,257 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const log = @import("log.zig");
const ThreadPool = @This();
allocator: Allocator,
active: u16,
shutdown: bool,
max_threads: u16,
lock: std.Thread.RwLock,
threads: std.DoublyLinkedList,
const Func = struct {
ptr: *const fn (*anyopaque) void,
args: []u8,
alignment: std.mem.Alignment,
fn init(allocator: Allocator, func: anytype, args: anytype) !Func {
const Args = @TypeOf(args);
const Wrapper = struct {
fn call(ctx: *anyopaque) void {
const a: *Args = @ptrCast(@alignCast(ctx));
@call(.auto, func, a.*);
}
};
const alignment: std.mem.Alignment = .of(Args);
const size = @sizeOf(Args);
if (size == 0) {
return .{
.ptr = Wrapper.call,
.args = &.{},
.alignment = alignment,
};
}
const args_buf = try allocator.alignedAlloc(u8, alignment, size);
const bytes: []const u8 = @ptrCast((&args)[0..1]);
@memcpy(args_buf, bytes);
return .{
.ptr = Wrapper.call,
.args = args_buf,
.alignment = alignment,
};
}
fn call(self: Func) void {
self.ptr(@ptrCast(self.args.ptr));
}
fn free(self: Func, allocator: Allocator) void {
if (self.args.len > 0) {
allocator.rawFree(self.args, self.alignment, @returnAddress());
}
}
};
const Worker = struct {
run_fn: Func,
shutdown_fn: Func,
pool: *ThreadPool,
thread: std.Thread,
node: std.DoublyLinkedList.Node,
fn run(self: *Worker) void {
self.run_fn.call();
self.deinit();
}
fn deinit(self: *Worker) void {
const pool = self.pool;
pool.lock.lock();
pool.threads.remove(&self.node);
pool.active -= 1;
pool.lock.unlock();
self.run_fn.free(pool.allocator);
self.shutdown_fn.free(pool.allocator);
pool.allocator.destroy(self);
}
fn callShutdown(self: *Worker) void {
self.shutdown_fn.call();
}
};
pub fn init(allocator: Allocator, max_threads: u16) ThreadPool {
return .{
.allocator = allocator,
.max_threads = max_threads,
.active = 0,
.shutdown = false,
.threads = .{},
.lock = .{},
};
}
pub fn deinit(self: *ThreadPool) void {
self.join();
}
/// Spawn a thread to run run_func(run_args). shutdown_func is called during join().
pub fn spawn(
self: *ThreadPool,
run_func: anytype,
run_args: std.meta.ArgsTuple(@TypeOf(run_func)),
shutdown_func: anytype,
shutdown_args: std.meta.ArgsTuple(@TypeOf(shutdown_func)),
) !void {
const run_fn = try Func.init(self.allocator, run_func, run_args);
errdefer run_fn.free(self.allocator);
const shutdown_fn = try Func.init(self.allocator, shutdown_func, shutdown_args);
errdefer shutdown_fn.free(self.allocator);
const worker = try self.allocator.create(Worker);
errdefer self.allocator.destroy(worker);
worker.* = .{
.run_fn = run_fn,
.shutdown_fn = shutdown_fn,
.pool = self,
.thread = undefined,
.node = .{},
};
self.lock.lock();
defer self.lock.unlock();
if (self.shutdown) {
return error.PoolShuttingDown;
}
if (self.active >= self.max_threads) {
return error.MaxThreadsReached;
}
self.threads.append(&worker.node);
self.active += 1;
worker.thread = std.Thread.spawn(.{}, Worker.run, .{worker}) catch |err| {
self.threads.remove(&worker.node);
self.active -= 1;
return err;
};
}
/// Number of active threads.
pub fn count(self: *ThreadPool) u16 {
self.lock.lockShared();
defer self.lock.unlockShared();
return self.active;
}
/// Wait for all threads to finish.
pub fn join(self: *ThreadPool) void {
self.lock.lock();
self.shutdown = true;
// Call shutdown on all active workers
var node = self.threads.first;
while (node) |n| {
const worker: *Worker = @fieldParentPtr("node", n);
worker.callShutdown();
node = n.next;
}
self.lock.unlock();
while (true) {
self.lock.lockShared();
const active = self.active;
self.lock.unlockShared();
if (active == 0) break;
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
pub fn isShuttingDown(self: *ThreadPool) bool {
self.lock.lockShared();
defer self.lock.unlockShared();
return self.shutdown;
}
// Tests
const testing = std.testing;
fn noop() void {}
fn increment(counter: *std.atomic.Value(u32)) void {
_ = counter.fetchAdd(1, .acq_rel);
}
fn block(flag: *std.atomic.Value(bool)) void {
while (!flag.load(.acquire)) {
std.Thread.sleep(1 * std.time.ns_per_ms);
}
}
fn unblock(flag: *std.atomic.Value(bool)) void {
flag.store(true, .release);
}
test "ThreadPool: spawn and join" {
var counter = std.atomic.Value(u32).init(0);
var pool = ThreadPool.init(testing.allocator, 4);
defer pool.deinit();
try pool.spawn(increment, .{&counter}, noop, .{});
try pool.spawn(increment, .{&counter}, noop, .{});
try pool.spawn(increment, .{&counter}, noop, .{});
pool.join();
try testing.expectEqual(@as(u32, 3), counter.load(.acquire));
try testing.expectEqual(@as(u16, 0), pool.count());
}
test "ThreadPool: max threads limit" {
var flag = std.atomic.Value(bool).init(false);
var pool = ThreadPool.init(testing.allocator, 2);
defer pool.deinit();
try pool.spawn(block, .{&flag}, unblock, .{&flag});
try pool.spawn(block, .{&flag}, unblock, .{&flag});
try testing.expectError(error.MaxThreadsReached, pool.spawn(block, .{&flag}, unblock, .{&flag}));
try testing.expectEqual(@as(u16, 2), pool.count());
// deinit will call unblock via shutdown callback
}

View File

@@ -26,7 +26,8 @@ const log = @import("../log.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const ArenaPool = App.ArenaPool; const ArenaPool = App.ArenaPool;
const HttpClient = App.Http.Client; const Http = App.Http;
const HttpClient = Http.Client;
const Notification = App.Notification; const Notification = App.Notification;
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -40,35 +41,33 @@ const Browser = @This();
env: js.Env, env: js.Env,
app: *App, app: *App,
http_client: *HttpClient,
session: ?Session, session: ?Session,
allocator: Allocator, allocator: Allocator,
arena_pool: *ArenaPool, arena_pool: *ArenaPool,
http_client: *HttpClient,
call_arena: ArenaAllocator, call_arena: ArenaAllocator,
page_arena: ArenaAllocator, page_arena: ArenaAllocator,
session_arena: ArenaAllocator, session_arena: ArenaAllocator,
transfer_arena: ArenaAllocator, transfer_arena: ArenaAllocator,
notification: *Notification, notification: *Notification,
pub fn init(app: *App) !Browser { pub fn init(allocator: Allocator, app: *App, http_client: *HttpClient) !Browser {
const allocator = app.allocator;
var env = try js.Env.init(allocator, &app.platform, &app.snapshot); var env = try js.Env.init(allocator, &app.platform, &app.snapshot);
errdefer env.deinit(); errdefer env.deinit();
const notification = try Notification.init(allocator, app.notification); const notification = try Notification.init(allocator, app.notification);
app.http.client.notification = notification; http_client.notification = notification;
app.http.client.next_request_id = 0; // Should we track ids in CDP only? http_client.next_request_id = 0; // Should we track ids in CDP only?
errdefer notification.deinit(); errdefer notification.deinit();
return .{ return .{
.app = app, .app = app,
.env = env, .env = env,
.http_client = http_client,
.session = null, .session = null,
.allocator = allocator, .allocator = allocator,
.notification = notification, .notification = notification,
.arena_pool = &app.arena_pool, .arena_pool = &app.arena_pool,
.http_client = app.http.client,
.call_arena = ArenaAllocator.init(allocator), .call_arena = ArenaAllocator.init(allocator),
.page_arena = ArenaAllocator.init(allocator), .page_arena = ArenaAllocator.init(allocator),
.session_arena = ArenaAllocator.init(allocator), .session_arena = ArenaAllocator.init(allocator),

View File

@@ -66,7 +66,7 @@ pub fn init(self: *Session, browser: *Browser) !void {
var executor = try browser.env.newExecutionWorld(); var executor = try browser.env.newExecutionWorld();
errdefer executor.deinit(); errdefer executor.deinit();
const allocator = browser.app.allocator; const allocator = browser.allocator;
const session_allocator = browser.session_arena.allocator(); const session_allocator = browser.session_arena.allocator();
self.* = .{ self.* = .{
@@ -86,7 +86,7 @@ pub fn deinit(self: *Session) void {
self.removePage(); self.removePage();
} }
self.cookie_jar.deinit(); self.cookie_jar.deinit();
self.storage_shed.deinit(self.browser.app.allocator); self.storage_shed.deinit(self.browser.allocator);
self.executor.deinit(); self.executor.deinit();
} }

View File

@@ -27,7 +27,7 @@ _pad: bool = false,
pub const init: Navigator = .{}; pub const init: Navigator = .{};
pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 { pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 {
return page._session.browser.app.http.user_agent; return page._session.browser.app.network.user_agent;
} }
pub fn getAppName(_: *const Navigator) []const u8 { pub fn getAppName(_: *const Navigator) []const u8 {

View File

@@ -26,6 +26,7 @@ const log = @import("../log.zig");
const js = @import("../browser/js/js.zig"); const js = @import("../browser/js/js.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const Http = App.Http;
const Browser = @import("../browser/Browser.zig"); const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig"); const Session = @import("../browser/Session.zig");
const Page = @import("../browser/Page.zig"); const Page = @import("../browser/Page.zig");
@@ -78,9 +79,8 @@ pub fn CDPT(comptime TypeProvider: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(allocator: Allocator, app: *App, http: *Http, client: TypeProvider.Client) !Self {
const allocator = app.allocator; const browser = try Browser.init(allocator, app, http.client);
const browser = try Browser.init(app);
errdefer browser.deinit(); errdefer browser.deinit();
return .{ return .{

View File

@@ -85,7 +85,7 @@ const TestContext = struct {
self.client = Client.init(self.arena.allocator()); self.client = Client.init(self.arena.allocator());
// Don't use the arena here. We want to detect leaks in CDP. // Don't use the arena here. We want to detect leaks in CDP.
// The arena is only for test-specific stuff // The arena is only for test-specific stuff
self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; self.cdp_ = TestCDP.init(std.testing.allocator, base.test_app, &base.test_http, &self.client.?) catch unreachable;
} }
return &self.cdp_.?; return &self.cdp_.?;
} }

View File

@@ -168,6 +168,8 @@ pub fn deinit(self: *Client) void {
self.abort(); self.abort();
self.handles.deinit(self.allocator); self.handles.deinit(self.allocator);
self.allocator.free(self.user_agent);
_ = c.curl_multi_cleanup(self.multi); _ = c.curl_multi_cleanup(self.multi);
self.transfer_pool.deinit(); self.transfer_pool.deinit();

View File

@@ -41,53 +41,23 @@ const ArenaAllocator = std.heap.ArenaAllocator;
// once for all http connections is a win. // once for all http connections is a win.
const Http = @This(); const Http = @This();
config: *const Config, pub const Network = @import("Network.zig");
network: *Network,
client: *Client, client: *Client,
ca_blob: ?c.curl_blob,
arena: ArenaAllocator,
user_agent: [:0]const u8,
proxy_bearer_header: ?[:0]const u8,
pub fn init(allocator: Allocator, config: *const Config) !Http { pub fn init(allocator: Allocator, network: *Network) !Http {
try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); var client = try Client.init(allocator, network.ca_blob, network.config);
errdefer c.curl_global_cleanup();
if (comptime ENABLE_DEBUG) {
std.debug.print("curl version: {s}\n\n", .{c.curl_version()});
}
var arena = ArenaAllocator.init(allocator);
errdefer arena.deinit();
const user_agent = try config.userAgent(arena.allocator());
var proxy_bearer_header: ?[:0]const u8 = null;
if (config.proxyBearerToken()) |bt| {
proxy_bearer_header = try std.fmt.allocPrintSentinel(arena.allocator(), "Proxy-Authorization: Bearer {s}", .{bt}, 0);
}
var ca_blob: ?c.curl_blob = null;
if (config.tlsVerifyHost()) {
ca_blob = try loadCerts(allocator, arena.allocator());
}
var client = try Client.init(allocator, ca_blob, config);
errdefer client.deinit(); errdefer client.deinit();
return .{ return .{
.arena = arena, .network = network,
.client = client, .client = client,
.ca_blob = ca_blob,
.config = config,
.user_agent = user_agent,
.proxy_bearer_header = proxy_bearer_header,
}; };
} }
pub fn deinit(self: *Http) void { pub fn deinit(self: *Http) void {
self.client.deinit(); self.client.deinit();
c.curl_global_cleanup();
self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus {
@@ -107,11 +77,11 @@ pub fn removeCDPClient(self: *Http) void {
} }
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
return Connection.init(self.ca_blob, self.config, self.user_agent, self.proxy_bearer_header); return Connection.init(self.network.ca_blob, self.network.config, self.network.user_agent, self.network.proxy_bearer_header);
} }
pub fn newHeaders(self: *const Http) Headers { pub fn newHeaders(self: *const Http) Headers {
return Headers.init(self.user_agent); return Headers.init(self.network.user_agent);
} }
pub const Connection = struct { pub const Connection = struct {
@@ -363,87 +333,6 @@ pub const Method = enum(u8) {
PATCH = 6, PATCH = 6,
}; };
// TODO: on BSD / Linux, we could just read the PEM file directly.
// This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different
// places, so it's still useful, just not efficient.
fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator);
defer bundle.deinit(allocator);
const bytes = bundle.bytes.items;
if (bytes.len == 0) {
log.warn(.app, "No system certificates", .{});
return .{
.len = 0,
.flags = 0,
.data = bytes.ptr,
};
}
const encoder = std.base64.standard.Encoder;
var arr: std.ArrayListUnmanaged(u8) = .empty;
const encoded_size = encoder.calcSize(bytes.len);
const buffer_size = encoded_size +
(bundle.map.count() * 75) + // start / end per certificate + extra, just in case
(encoded_size / 64) // newline per 64 characters
;
try arr.ensureTotalCapacity(arena, buffer_size);
var writer = arr.writer(arena);
var it = bundle.map.valueIterator();
while (it.next()) |index| {
const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*);
try writer.writeAll("-----BEGIN CERTIFICATE-----\n");
var line_writer = LineWriter{ .inner = writer };
try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]);
try writer.writeAll("\n-----END CERTIFICATE-----\n");
}
// Final encoding should not be larger than our initial size estimate
lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len });
return .{
.len = arr.items.len,
.data = arr.items.ptr,
.flags = 0,
};
}
// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is
// what Zig has), with lines wrapped at 64 characters and with a basic header
// and footer
const LineWriter = struct {
col: usize = 0,
inner: std.ArrayListUnmanaged(u8).Writer,
pub fn writeAll(self: *LineWriter, data: []const u8) !void {
var writer = self.inner;
var col = self.col;
const len = 64 - col;
var remain = data;
if (remain.len > len) {
col = 0;
try writer.writeAll(data[0..len]);
try writer.writeByte('\n');
remain = data[len..];
}
while (remain.len > 64) {
try writer.writeAll(remain[0..64]);
try writer.writeByte('\n');
remain = data[len..];
}
try writer.writeAll(remain);
self.col = col + remain.len;
}
};
pub fn debugCallback(_: *c.CURL, msg_type: c.curl_infotype, raw: [*c]u8, len: usize, _: *anyopaque) callconv(.c) void { pub fn debugCallback(_: *c.CURL, msg_type: c.curl_infotype, raw: [*c]u8, len: usize, _: *anyopaque) callconv(.c) void {
const data = raw[0..len]; const data = raw[0..len];
switch (msg_type) { switch (msg_type) {

163
src/http/Network.zig Normal file
View File

@@ -0,0 +1,163 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const lp = @import("lightpanda");
const Allocator = std.mem.Allocator;
const log = @import("../log.zig");
const Config = @import("../Config.zig");
const Http = @import("Http.zig");
pub const c = Http.c;
const Network = @This();
allocator: Allocator,
config: *const Config,
ca_blob: ?c.curl_blob,
user_agent: [:0]const u8,
proxy_bearer_header: ?[:0]const u8,
pub fn init(allocator: Allocator, config: *const Config) !Network {
try Http.errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL));
errdefer c.curl_global_cleanup();
const user_agent = try config.userAgent(allocator);
errdefer allocator.free(user_agent);
var proxy_bearer_header: ?[:0]const u8 = null;
if (config.proxyBearerToken()) |bt| {
proxy_bearer_header = try std.fmt.allocPrintSentinel(allocator, "Proxy-Authorization: Bearer {s}", .{bt}, 0);
}
errdefer if (proxy_bearer_header) |h| allocator.free(h);
var ca_blob: ?c.curl_blob = null;
if (config.tlsVerifyHost()) {
ca_blob = try loadCerts(allocator);
}
return .{
.allocator = allocator,
.config = config,
.ca_blob = ca_blob,
.user_agent = user_agent,
.proxy_bearer_header = proxy_bearer_header,
};
}
pub fn deinit(self: *Network) void {
if (self.ca_blob) |ca_blob| {
const data: [*]u8 = @ptrCast(ca_blob.data);
self.allocator.free(data[0..ca_blob.len]);
}
if (self.proxy_bearer_header) |h| self.allocator.free(h);
self.allocator.free(self.user_agent);
c.curl_global_cleanup();
}
pub fn createHttp(self: *Network, allocator: Allocator) !Http {
return Http.init(allocator, self);
}
// TODO: on BSD / Linux, we could just read the PEM file directly.
// This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different
// places, so it's still useful, just not efficient.
fn loadCerts(allocator: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator);
defer bundle.deinit(allocator);
const bytes = bundle.bytes.items;
if (bytes.len == 0) {
log.warn(.app, "No system certificates", .{});
return .{
.len = 0,
.flags = 0,
.data = bytes.ptr,
};
}
const encoder = std.base64.standard.Encoder;
var arr: std.ArrayListUnmanaged(u8) = .empty;
const encoded_size = encoder.calcSize(bytes.len);
const buffer_size = encoded_size +
(bundle.map.count() * 75) + // start / end per certificate + extra, just in case
(encoded_size / 64) // newline per 64 characters
;
try arr.ensureTotalCapacity(allocator, buffer_size);
errdefer arr.deinit(allocator);
var writer = arr.writer(allocator);
var it = bundle.map.valueIterator();
while (it.next()) |index| {
const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*);
try writer.writeAll("-----BEGIN CERTIFICATE-----\n");
var line_writer = LineWriter{ .inner = writer };
try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]);
try writer.writeAll("\n-----END CERTIFICATE-----\n");
}
// Final encoding should not be larger than our initial size estimate
lp.assert(buffer_size > arr.items.len, "Network loadCerts", .{ .estimate = buffer_size, .len = arr.items.len });
// Allocate exactly the size needed and copy the data
const result = try allocator.dupe(u8, arr.items);
// Free the original oversized allocation
arr.deinit(allocator);
return .{
.len = result.len,
.data = result.ptr,
.flags = 0,
};
}
// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is
// what Zig has), with lines wrapped at 64 characters and with a basic header
// and footer
const LineWriter = struct {
col: usize = 0,
inner: std.ArrayListUnmanaged(u8).Writer,
pub fn writeAll(self: *LineWriter, data: []const u8) !void {
var writer = self.inner;
var col = self.col;
const len = 64 - col;
var remain = data;
if (remain.len > len) {
col = 0;
try writer.writeAll(data[0..len]);
try writer.writeByte('\n');
remain = data[len..];
}
while (remain.len > 64) {
try writer.writeAll(remain[0..64]);
try writer.writeByte('\n');
remain = data[len..];
}
try writer.writeAll(remain);
self.col = col + remain.len;
}
};

View File

@@ -20,6 +20,7 @@ const std = @import("std");
pub const App = @import("App.zig"); pub const App = @import("App.zig");
pub const Server = @import("Server.zig"); pub const Server = @import("Server.zig");
pub const Config = @import("Config.zig"); pub const Config = @import("Config.zig");
pub const ThreadPool = @import("ThreadPool.zig");
pub const Page = @import("browser/Page.zig"); pub const Page = @import("browser/Page.zig");
pub const Browser = @import("browser/Browser.zig"); pub const Browser = @import("browser/Browser.zig");
pub const Session = @import("browser/Session.zig"); pub const Session = @import("browser/Session.zig");
@@ -37,8 +38,11 @@ pub const FetchOpts = struct {
dump: dump.RootOpts, dump: dump.RootOpts,
writer: ?*std.Io.Writer = null, writer: ?*std.Io.Writer = null,
}; };
pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { pub fn fetch(allocator: std.mem.Allocator, app: *App, url: [:0]const u8, opts: FetchOpts) !void {
var browser = try Browser.init(app); var http = try app.network.createHttp(allocator);
defer http.deinit();
var browser = try Browser.init(allocator, app, http.client);
defer browser.deinit(); defer browser.deinit();
var session = try browser.newSession(); var session = try browser.newSession();

View File

@@ -80,7 +80,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
// _app is global to handle graceful shutdown. // _app is global to handle graceful shutdown.
var app = try App.init(allocator, &args); var app = try App.init(allocator, &args);
defer app.deinit(); defer app.deinit(allocator);
app.telemetry.record(.{ .run = {} }); app.telemetry.record(.{ .run = {} });
switch (args.mode) { switch (args.mode) {
@@ -92,7 +92,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
}; };
// _server is global to handle graceful shutdown. // _server is global to handle graceful shutdown.
var server = try lp.Server.init(app, address); var server = try lp.Server.init(allocator, app, address);
defer server.deinit(); defer server.deinit();
try sighandler.on(lp.Server.stop, .{&server}); try sighandler.on(lp.Server.stop, .{&server});
@@ -122,7 +122,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo
fetch_opts.writer = &writer.interface; fetch_opts.writer = &writer.interface;
} }
lp.fetch(app, url, fetch_opts) catch |err| { lp.fetch(allocator, app, url, fetch_opts) catch |err| {
log.fatal(.app, "fetch error", .{ .err = err, .url = url }); log.fatal(.app, "fetch error", .{ .err = err, .url = url });
return err; return err;
}; };

View File

@@ -42,12 +42,15 @@ pub fn main() !void {
.exec_name = "legacy-test", .exec_name = "legacy-test",
}; };
var app = try lp.App.init(allocator, &config); var app = try lp.App.init(allocator, &config);
defer app.deinit(); defer app.deinit(allocator);
var test_arena = std.heap.ArenaAllocator.init(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit(); defer test_arena.deinit();
var browser = try lp.Browser.init(app); var http = try app.network.createHttp(allocator);
defer http.deinit();
var browser = try lp.Browser.init(allocator, app, http.client);
defer browser.deinit(); defer browser.deinit();
const session = try browser.newSession(); const session = try browser.newSession();

View File

@@ -68,9 +68,12 @@ pub fn main() !void {
.exec_name = "lightpanda-wpt", .exec_name = "lightpanda-wpt",
}; };
var app = try lp.App.init(allocator, &config); var app = try lp.App.init(allocator, &config);
defer app.deinit(); defer app.deinit(allocator);
var browser = try lp.Browser.init(app); var http = try app.network.createHttp(allocator);
defer http.deinit();
var browser = try lp.Browser.init(allocator, app, http.client);
defer browser.deinit(); defer browser.deinit();
// An arena for running each tests. Is reset after every test. // An arena for running each tests. Is reset after every test.

View File

@@ -20,18 +20,21 @@ pub const LightPanda = struct {
allocator: Allocator, allocator: Allocator,
mutex: std.Thread.Mutex, mutex: std.Thread.Mutex,
cond: Thread.Condition, cond: Thread.Condition,
http: Http,
connection: Http.Connection, connection: Http.Connection,
pending: std.DoublyLinkedList, pending: std.DoublyLinkedList,
mem_pool: std.heap.MemoryPool(LightPandaEvent), mem_pool: std.heap.MemoryPool(LightPandaEvent),
pub fn init(app: *App) !LightPanda { pub fn init(allocator: Allocator, app: *App) !LightPanda {
const connection = try app.http.newConnection(); var http = try app.network.createHttp(allocator);
errdefer http.deinit();
const connection = try http.newConnection();
errdefer connection.deinit(); errdefer connection.deinit();
try connection.setURL(URL); try connection.setURL(URL);
try connection.setMethod(.POST); try connection.setMethod(.POST);
const allocator = app.allocator;
return .{ return .{
.cond = .{}, .cond = .{},
.mutex = .{}, .mutex = .{},
@@ -39,6 +42,7 @@ pub const LightPanda = struct {
.thread = null, .thread = null,
.running = true, .running = true,
.allocator = allocator, .allocator = allocator,
.http = http,
.connection = connection, .connection = connection,
.mem_pool = std.heap.MemoryPool(LightPandaEvent).init(allocator), .mem_pool = std.heap.MemoryPool(LightPandaEvent).init(allocator),
}; };
@@ -54,6 +58,7 @@ pub const LightPanda = struct {
} }
self.mem_pool.deinit(); self.mem_pool.deinit();
self.connection.deinit(); self.connection.deinit();
self.http.deinit();
} }
pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void {

View File

@@ -34,13 +34,13 @@ fn TelemetryT(comptime P: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, run_mode: Config.RunMode) !Self { pub fn init(allocator: Allocator, app: *App, run_mode: Config.RunMode) !Self {
const disabled = isDisabled(); const disabled = isDisabled();
if (builtin.mode != .Debug and builtin.is_test == false) { if (builtin.mode != .Debug and builtin.is_test == false) {
log.info(.telemetry, "telemetry status", .{ .disabled = disabled }); log.info(.telemetry, "telemetry status", .{ .disabled = disabled });
} }
const provider = try P.init(app); const provider = try P.init(allocator, app);
errdefer provider.deinit(); errdefer provider.deinit();
return .{ return .{
@@ -142,7 +142,7 @@ pub const Event = union(enum) {
}; };
const NoopProvider = struct { const NoopProvider = struct {
fn init(_: *App) !NoopProvider { fn init(_: Allocator, _: *App) !NoopProvider {
return .{}; return .{};
} }
fn deinit(_: NoopProvider) void {} fn deinit(_: NoopProvider) void {}
@@ -158,7 +158,7 @@ test "telemetry: disabled by environment" {
defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY"));
const FailingProvider = struct { const FailingProvider = struct {
fn init(_: *App) !@This() { fn init(_: Allocator, _: *App) !@This() {
return .{}; return .{};
} }
fn deinit(_: @This()) void {} fn deinit(_: @This()) void {}
@@ -167,7 +167,7 @@ test "telemetry: disabled by environment" {
} }
}; };
var telemetry = try TelemetryT(FailingProvider).init(undefined, .serve); var telemetry = try TelemetryT(FailingProvider).init(std.testing.allocator, undefined, .serve);
defer telemetry.deinit(); defer telemetry.deinit();
telemetry.record(.{ .run = {} }); telemetry.record(.{ .run = {} });
} }
@@ -191,7 +191,7 @@ test "telemetry: getOrCreateId" {
} }
test "telemetry: sends event to provider" { test "telemetry: sends event to provider" {
var telemetry = try TelemetryT(MockProvider).init(testing.test_app, .serve); var telemetry = try TelemetryT(MockProvider).init(std.testing.allocator, testing.test_app, .serve);
defer telemetry.deinit(); defer telemetry.deinit();
const mock = &telemetry.provider; const mock = &telemetry.provider;
@@ -211,12 +211,12 @@ const MockProvider = struct {
allocator: Allocator, allocator: Allocator,
events: std.ArrayListUnmanaged(Event), events: std.ArrayListUnmanaged(Event),
fn init(app: *App) !@This() { fn init(allocator: Allocator, _: *App) !@This() {
return .{ return .{
.iid = null, .iid = null,
.run_mode = null, .run_mode = null,
.events = .{}, .events = .{},
.allocator = app.allocator, .allocator = allocator,
}; };
} }
fn deinit(self: *MockProvider) void { fn deinit(self: *MockProvider) void {

View File

@@ -333,6 +333,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool {
} }
pub var test_app: *App = undefined; pub var test_app: *App = undefined;
pub var test_http: App.Http = undefined;
pub var test_browser: Browser = undefined; pub var test_browser: Browser = undefined;
pub var test_session: *Session = undefined; pub var test_session: *Session = undefined;
@@ -465,9 +466,12 @@ test "tests:beforeAll" {
log.opts.format = .pretty; log.opts.format = .pretty;
test_app = try App.init(@import("root").tracking_allocator, &test_config); test_app = try App.init(@import("root").tracking_allocator, &test_config);
errdefer test_app.deinit(); errdefer test_app.deinit(@import("root").tracking_allocator);
test_browser = try Browser.init(test_app); test_http = try test_app.network.createHttp(@import("root").tracking_allocator);
errdefer test_http.deinit();
test_browser = try Browser.init(@import("root").tracking_allocator, test_app, test_http.client);
errdefer test_browser.deinit(); errdefer test_browser.deinit();
test_session = try test_browser.newSession(); test_session = try test_browser.newSession();
@@ -502,14 +506,16 @@ test "tests:afterAll" {
@import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size; @import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size;
test_browser.deinit(); test_browser.deinit();
test_app.deinit(); test_http.deinit();
test_app.deinit(@import("root").tracking_allocator);
} }
fn serveCDP(wg: *std.Thread.WaitGroup) !void { fn serveCDP(wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9583); const address = try std.net.Address.parseIp("127.0.0.1", 9583);
test_cdp_server = try Server.init(test_app, address); const test_allocator = @import("root").tracking_allocator;
test_cdp_server = try Server.init(test_allocator, test_app, address);
var server = try Server.init(test_app, address); var server = try Server.init(test_allocator, test_app, address);
defer server.deinit(); defer server.deinit();
wg.finish(); wg.finish();