Use per-cdp connection HttpClient

This commit is contained in:
Nikolay Govorov
2026-02-17 12:37:44 +00:00
parent fbe65cd542
commit 9296c10ca4
9 changed files with 82 additions and 50 deletions

View File

@@ -30,6 +30,8 @@ const log = @import("log.zig");
const App = @import("App.zig"); const App = @import("App.zig");
const Config = @import("Config.zig"); const Config = @import("Config.zig");
const CDP = @import("cdp/cdp.zig").CDP; const CDP = @import("cdp/cdp.zig").CDP;
const Http = @import("http/Http.zig");
const HttpClient = @import("http/Client.zig");
const Server = @This(); const Server = @This();
@@ -147,19 +149,22 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
client.* = try Client.init(socket, self); client.* = try Client.init(socket, self);
defer client.deinit(); defer client.deinit();
var http = &self.app.http; client.http.cdp_client = .{
http.addCDPClient(.{ .socket = client.socket,
.socket = socket,
.ctx = client, .ctx = client,
.blocking_read_start = Client.blockingReadStart, .blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead, .blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop, .blocking_read_end = Client.blockingReadStop,
}); };
defer http.removeCDPClient(); defer client.http.cdp_client = null;
lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{});
while (true) { while (true) {
if (http.poll(timeout_ms) != .cdp_socket) { const status = client.http.tick(timeout_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
@@ -186,7 +191,11 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
ms_remaining = timeout_ms; ms_remaining = timeout_ms;
}, },
.no_page => { .no_page => {
if (http.poll(ms_remaining) != .cdp_socket) { const status = client.http.tick(ms_remaining) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return;
};
if (status != .cdp_socket) {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
@@ -217,6 +226,7 @@ pub const Client = struct {
}, },
server: *Server, server: *Server,
http: *HttpClient,
reader: Reader(true), reader: Reader(true),
socket: posix.socket_t, socket: posix.socket_t,
socket_flags: usize, socket_flags: usize,
@@ -240,9 +250,13 @@ pub const Client = struct {
var reader = try Reader(true).init(server.allocator); var reader = try Reader(true).init(server.allocator);
errdefer reader.deinit(); errdefer reader.deinit();
const http = try server.app.http.createClient(server.allocator);
errdefer http.deinit();
return .{ return .{
.socket = socket, .socket = socket,
.server = server, .server = server,
.http = http,
.reader = reader, .reader = reader,
.mode = .{ .http = {} }, .mode = .{ .http = {} },
.socket_flags = socket_flags, .socket_flags = socket_flags,
@@ -471,7 +485,7 @@ pub const Client = struct {
break :blk res; break :blk res;
}; };
self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; self.mode = .{ .cdp = try CDP.init(self.server.app, self.http, self) };
return self.send(response); return self.send(response);
} }

View File

@@ -24,9 +24,9 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const js = @import("js/js.zig"); const js = @import("js/js.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const HttpClient = @import("../http/Client.zig");
const ArenaPool = App.ArenaPool; const ArenaPool = App.ArenaPool;
const HttpClient = App.Http.Client;
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -47,6 +47,7 @@ http_client: *HttpClient,
const InitOpts = struct { const InitOpts = struct {
env: js.Env.InitOpts = .{}, env: js.Env.InitOpts = .{},
http_client: *HttpClient,
}; };
pub fn init(app: *App, opts: InitOpts) !Browser { pub fn init(app: *App, opts: InitOpts) !Browser {
@@ -61,7 +62,7 @@ pub fn init(app: *App, opts: InitOpts) !Browser {
.session = null, .session = null,
.allocator = allocator, .allocator = allocator,
.arena_pool = &app.arena_pool, .arena_pool = &app.arena_pool,
.http_client = app.http.client, .http_client = opts.http_client,
}; };
} }

View File

@@ -28,6 +28,7 @@ const js = @import("../browser/js/js.zig");
const App = @import("../App.zig"); const App = @import("../App.zig");
const Browser = @import("../browser/Browser.zig"); const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig"); const Session = @import("../browser/Session.zig");
const HttpClient = @import("../http/Client.zig");
const Page = @import("../browser/Page.zig"); const Page = @import("../browser/Page.zig");
const Incrementing = @import("../id.zig").Incrementing; const Incrementing = @import("../id.zig").Incrementing;
const Notification = @import("../Notification.zig"); const Notification = @import("../Notification.zig");
@@ -84,10 +85,11 @@ pub fn CDPT(comptime TypeProvider: type) type {
const Self = @This(); const Self = @This();
pub fn init(app: *App, client: TypeProvider.Client) !Self { pub fn init(app: *App, http_client: *HttpClient, client: TypeProvider.Client) !Self {
const allocator = app.allocator; const allocator = app.allocator;
const browser = try Browser.init(app, .{ const browser = try Browser.init(app, .{
.env = .{ .with_inspector = true }, .env = .{ .with_inspector = true },
.http_client = http_client,
}); });
errdefer browser.deinit(); errdefer browser.deinit();

View File

@@ -85,7 +85,7 @@ const TestContext = struct {
self.client = Client.init(self.arena.allocator()); self.client = Client.init(self.arena.allocator());
// Don't use the arena here. We want to detect leaks in CDP. // Don't use the arena here. We want to detect leaks in CDP.
// The arena is only for test-specific stuff // The arena is only for test-specific stuff
self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; self.cdp_ = TestCDP.init(base.test_app, base.test_http, &self.client.?) catch unreachable;
} }
return &self.cdp_.?; return &self.cdp_.?;
} }

View File

@@ -17,8 +17,6 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
pub const c = @cImport({ pub const c = @cImport({
@cInclude("curl/curl.h"); @cInclude("curl/curl.h");
@@ -28,6 +26,8 @@ pub const ENABLE_DEBUG = false;
pub const Client = @import("Client.zig"); pub const Client = @import("Client.zig");
pub const Transfer = Client.Transfer; pub const Transfer = Client.Transfer;
const lp = @import("lightpanda");
const Config = @import("../Config.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const errors = @import("errors.zig"); const errors = @import("errors.zig");
const RobotStore = @import("../browser/Robots.zig").RobotStore; const RobotStore = @import("../browser/Robots.zig").RobotStore;
@@ -42,10 +42,11 @@ const ArenaAllocator = std.heap.ArenaAllocator;
// once for all http connections is a win. // once for all http connections is a win.
const Http = @This(); const Http = @This();
config: *const Config,
client: *Client,
ca_blob: ?c.curl_blob,
arena: ArenaAllocator, arena: ArenaAllocator,
allocator: Allocator,
config: *const Config,
ca_blob: ?c.curl_blob,
robot_store: *RobotStore,
pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http { pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Config) !Http {
try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL));
@@ -60,40 +61,29 @@ pub fn init(allocator: Allocator, robot_store: *RobotStore, config: *const Confi
var ca_blob: ?c.curl_blob = null; var ca_blob: ?c.curl_blob = null;
if (config.tlsVerifyHost()) { if (config.tlsVerifyHost()) {
ca_blob = try loadCerts(allocator, arena.allocator()); ca_blob = try loadCerts(allocator);
} }
var client = try Client.init(allocator, ca_blob, robot_store, config);
errdefer client.deinit();
return .{ return .{
.arena = arena, .arena = arena,
.client = client, .allocator = allocator,
.ca_blob = ca_blob,
.config = config, .config = config,
.ca_blob = ca_blob,
.robot_store = robot_store,
}; };
} }
pub fn deinit(self: *Http) void { pub fn deinit(self: *Http) void {
self.client.deinit(); if (self.ca_blob) |ca_blob| {
const data: [*]u8 = @ptrCast(ca_blob.data);
self.allocator.free(data[0..ca_blob.len]);
}
c.curl_global_cleanup(); c.curl_global_cleanup();
self.arena.deinit(); self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { pub fn createClient(self: *Http, allocator: Allocator) !*Client {
return self.client.tick(timeout_ms) catch |err| { return Client.init(allocator, self.ca_blob, self.robot_store, self.config);
log.err(.app, "http poll", .{ .err = err });
return .normal;
};
}
pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void {
lp.assert(self.client.cdp_client == null, "Http addCDPClient existing", .{});
self.client.cdp_client = cdp_client;
}
pub fn removeCDPClient(self: *Http) void {
self.client.cdp_client = null;
} }
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
@@ -351,7 +341,7 @@ pub const Method = enum(u8) {
// This whole rescan + decode is really just needed for MacOS. On Linux // This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different // bundle.rescan does find the .pem file(s) which could be in a few different
// places, so it's still useful, just not efficient. // places, so it's still useful, just not efficient.
fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { fn loadCerts(allocator: Allocator) !c.curl_blob {
var bundle: std.crypto.Certificate.Bundle = .{}; var bundle: std.crypto.Certificate.Bundle = .{};
try bundle.rescan(allocator); try bundle.rescan(allocator);
defer bundle.deinit(allocator); defer bundle.deinit(allocator);
@@ -374,8 +364,9 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
(bundle.map.count() * 75) + // start / end per certificate + extra, just in case (bundle.map.count() * 75) + // start / end per certificate + extra, just in case
(encoded_size / 64) // newline per 64 characters (encoded_size / 64) // newline per 64 characters
; ;
try arr.ensureTotalCapacity(arena, buffer_size); try arr.ensureTotalCapacity(allocator, buffer_size);
var writer = arr.writer(arena); errdefer arr.deinit(allocator);
var writer = arr.writer(allocator);
var it = bundle.map.valueIterator(); var it = bundle.map.valueIterator();
while (it.next()) |index| { while (it.next()) |index| {
@@ -388,11 +379,16 @@ fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob {
} }
// Final encoding should not be larger than our initial size estimate // Final encoding should not be larger than our initial size estimate
lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estimate = buffer_size, .len = arr.items.len });
// Allocate exactly the size needed and copy the data
const result = try allocator.dupe(u8, arr.items);
// Free the original oversized allocation
arr.deinit(allocator);
return .{ return .{
.len = arr.items.len, .len = result.len,
.data = arr.items.ptr, .data = result.ptr,
.flags = 0, .flags = 0,
}; };
} }

View File

@@ -39,10 +39,13 @@ pub const FetchOpts = struct {
writer: ?*std.Io.Writer = null, writer: ?*std.Io.Writer = null,
}; };
pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void {
const http_client = try app.http.createClient(app.allocator);
defer http_client.deinit();
const notification = try Notification.init(app.allocator); const notification = try Notification.init(app.allocator);
defer notification.deinit(); defer notification.deinit();
var browser = try Browser.init(app, .{}); var browser = try Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
var session = try browser.newSession(notification); var session = try browser.newSession(notification);

View File

@@ -46,17 +46,24 @@ pub fn main() !void {
var test_arena = std.heap.ArenaAllocator.init(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator);
defer test_arena.deinit(); defer test_arena.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
const notification = try lp.Notification.init(app.allocator); defer http_client.deinit();
defer notification.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
const notification = try lp.Notification.init(allocator);
defer notification.deinit();
const session = try browser.newSession(notification); const session = try browser.newSession(notification);
defer session.deinit();
var dir = try std.fs.cwd().openDir("src/browser/tests/legacy/", .{ .iterate = true, .no_follow = true }); var dir = try std.fs.cwd().openDir("src/browser/tests/legacy/", .{ .iterate = true, .no_follow = true });
defer dir.close(); defer dir.close();
var walker = try dir.walk(allocator); var walker = try dir.walk(allocator);
defer walker.deinit(); defer walker.deinit();
while (try walker.next()) |entry| { while (try walker.next()) |entry| {
_ = test_arena.reset(.retain_capacity); _ = test_arena.reset(.retain_capacity);
if (entry.kind != .file) { if (entry.kind != .file) {

View File

@@ -69,7 +69,10 @@ pub fn main() !void {
var app = try lp.App.init(allocator, &config); var app = try lp.App.init(allocator, &config);
defer app.deinit(); defer app.deinit();
var browser = try lp.Browser.init(app, .{}); const http_client = try app.http.createClient(allocator);
defer http_client.deinit();
var browser = try lp.Browser.init(app, .{ .http_client = http_client });
defer browser.deinit(); defer browser.deinit();
// An arena for running each tests. Is reset after every test. // An arena for running each tests. Is reset after every test.

View File

@@ -39,6 +39,7 @@ pub fn reset() void {
const App = @import("App.zig"); const App = @import("App.zig");
const js = @import("browser/js/js.zig"); const js = @import("browser/js/js.zig");
const Config = @import("Config.zig"); const Config = @import("Config.zig");
const Client = @import("http/Client.zig");
const Page = @import("browser/Page.zig"); const Page = @import("browser/Page.zig");
const Browser = @import("browser/Browser.zig"); const Browser = @import("browser/Browser.zig");
const Session = @import("browser/Session.zig"); const Session = @import("browser/Session.zig");
@@ -334,6 +335,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool {
} }
pub var test_app: *App = undefined; pub var test_app: *App = undefined;
pub var test_http: *Client = undefined;
pub var test_browser: Browser = undefined; pub var test_browser: Browser = undefined;
pub var test_notification: *Notification = undefined; pub var test_notification: *Notification = undefined;
pub var test_session: *Session = undefined; pub var test_session: *Session = undefined;
@@ -472,7 +474,10 @@ test "tests:beforeAll" {
test_app = try App.init(test_allocator, &test_config); test_app = try App.init(test_allocator, &test_config);
errdefer test_app.deinit(); errdefer test_app.deinit();
test_browser = try Browser.init(test_app, .{}); test_http = try test_app.http.createClient(test_allocator);
errdefer test_http.deinit();
test_browser = try Browser.init(test_app, .{ .http_client = test_http });
errdefer test_browser.deinit(); errdefer test_browser.deinit();
// Create notification for testing // Create notification for testing
@@ -519,6 +524,7 @@ test "tests:afterAll" {
test_notification.deinit(); test_notification.deinit();
test_browser.deinit(); test_browser.deinit();
test_http.deinit();
test_app.deinit(); test_app.deinit();
test_config.deinit(@import("root").tracking_allocator); test_config.deinit(@import("root").tracking_allocator);
} }