Move curl_multi to Net layer

This commit is contained in:
Nikolay Govorov
2026-02-24 08:26:18 +00:00
parent 29982e2caf
commit 3e2a4d8053
4 changed files with 173 additions and 202 deletions

View File

@@ -471,6 +471,7 @@ pub fn globalDeinit() void {
pub const Connection = struct {
easy: *c.CURL,
node: Handles.HandleList.Node = .{},
pub fn init(
ca_blob_: ?c.curl_blob,
@@ -700,6 +701,110 @@ pub const Connection = struct {
}
};
pub const Handles = struct {
connections: []Connection,
in_use: HandleList,
available: HandleList,
multi: *c.CURLM,
performing: bool = false,
pub const HandleList = std.DoublyLinkedList;
pub fn init(
allocator: Allocator,
ca_blob: ?c.curl_blob,
config: *const Config,
) !Handles {
const count: usize = config.httpMaxConcurrent();
if (count == 0) return error.InvalidMaxConcurrent;
const multi = c.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer _ = c.curl_multi_cleanup(multi);
try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, config.httpMaxHostOpen())));
const connections = try allocator.alloc(Connection, count);
errdefer allocator.free(connections);
var available: HandleList = .{};
for (0..count) |i| {
connections[i] = try Connection.init(ca_blob, config);
available.append(&connections[i].node);
}
return .{
.in_use = .{},
.connections = connections,
.available = available,
.multi = multi,
};
}
pub fn deinit(self: *Handles, allocator: Allocator) void {
for (self.connections) |*conn| {
conn.deinit();
}
allocator.free(self.connections);
_ = c.curl_multi_cleanup(self.multi);
}
pub fn hasAvailable(self: *const Handles) bool {
return self.available.first != null;
}
pub fn get(self: *Handles) ?*Connection {
if (self.available.popFirst()) |node| {
node.prev = null;
node.next = null;
self.in_use.append(node);
return @as(*Connection, @fieldParentPtr("node", node));
}
return null;
}
pub fn add(self: *Handles, conn: *const Connection) !void {
try errorMCheck(c.curl_multi_add_handle(self.multi, conn.easy));
}
pub fn remove(self: *Handles, conn: *Connection) void {
errorMCheck(c.curl_multi_remove_handle(self.multi, conn.easy)) catch |err| {
log.fatal(.http, "multi remove handle", .{ .err = err });
};
var node = &conn.node;
self.in_use.remove(node);
node.prev = null;
node.next = null;
self.available.append(node);
}
pub fn perform(self: *Handles) !c_int {
var running: c_int = undefined;
self.performing = true;
defer self.performing = false;
try errorMCheck(c.curl_multi_perform(self.multi, &running));
return running;
}
pub fn poll(self: *Handles, extra_fds: []c.curl_waitfd, timeout_ms: c_int) !void {
try errorMCheck(c.curl_multi_poll(self.multi, extra_fds.ptr, @intCast(extra_fds.len), timeout_ms, null));
}
pub const MultiMessage = struct {
conn: Connection,
err: ?Error,
};
pub fn readMessage(self: *Handles) ?MultiMessage {
var messages_count: c_int = 0;
const msg_ = c.curl_multi_info_read(self.multi, &messages_count) orelse return null;
const msg: *c.CURLMsg = @ptrCast(msg_);
return .{
.conn = .{ .easy = msg.easy_handle.? },
.err = if (errorCheck(msg.data.result)) |_| null else |err| err,
};
}
};
// TODO: on BSD / Linux, we could just read the PEM file directly.
// This whole rescan + decode is really just needed for MacOS. On Linux
// bundle.rescan does find the .pem file(s) which could be in a few different

View File

@@ -63,6 +63,7 @@ const NavigationKind = @import("webapi/navigation/root.zig").NavigationKind;
const KeyboardEvent = @import("webapi/event/KeyboardEvent.zig");
const Http = App.Http;
const Net = @import("../Net.zig");
const ArenaPool = App.ArenaPool;
const timestamp = @import("../datetime.zig").timestamp;
@@ -1112,8 +1113,8 @@ fn printWaitAnalysis(self: *Page) void {
std.debug.print("\nactive requests: {d}\n", .{self._session.browser.http_client.active});
var n_ = self._session.browser.http_client.handles.in_use.first;
while (n_) |n| {
const handle: *Http.Client.Handle = @fieldParentPtr("node", n);
const transfer = Http.Transfer.fromEasy(handle.conn.easy) catch |err| {
const conn: *Net.Connection = @fieldParentPtr("node", n);
const transfer = Http.Transfer.fromConnection(conn) catch |err| {
std.debug.print(" - failed to load transfer: {any}\n", .{err});
break;
};

View File

@@ -70,14 +70,7 @@ active: usize,
// 'networkAlmostIdle' Page.lifecycleEvent in CDP).
intercepted: usize,
// curl has 2 APIs: easy and multi. Multi is like a combination of some I/O block
// (e.g. epoll) and a bunch of pools. You add/remove easys to the multiple and
// then poll the multi.
multi: *c.CURLM,
// Our easy handles. Although the multi contains buffer pools and connections
// pools, re-using the easys is still recommended. This acts as our own pool
// of easys.
// Our easy handles, managed by a curl multi.
handles: Handles,
// Use to generate the next request ID
@@ -113,10 +106,6 @@ config: *const Config,
cdp_client: ?CDPClient = null,
// keep track of when curl_multi_perform is happening so that we can avoid
// recursive calls into curl (which it will fail)
performing: bool = false,
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
// both HTTP data as well as messages from an CDP connection.
// Furthermore, we have some tension between blocking scripts and request
@@ -144,21 +133,20 @@ pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, robot_store: *RobotStor
const client = try allocator.create(Client);
errdefer allocator.destroy(client);
const multi = c.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer _ = c.curl_multi_cleanup(multi);
try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, config.httpMaxHostOpen())));
var handles = try Handles.init(allocator, client, ca_blob, config);
var handles = try Handles.init(allocator, ca_blob, config);
errdefer handles.deinit(allocator);
// Set transfer callbacks on each connection.
for (handles.connections) |*conn| {
try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback);
}
const http_proxy = config.httpProxy();
client.* = .{
.queue = .{},
.active = 0,
.intercepted = 0,
.multi = multi,
.handles = handles,
.allocator = allocator,
.robot_store = robot_store,
@@ -175,8 +163,6 @@ pub fn deinit(self: *Client) void {
self.abort();
self.handles.deinit(self.allocator);
_ = c.curl_multi_cleanup(self.multi);
self.transfer_pool.deinit();
var robots_iter = self.pending_robots_queue.iterator();
@@ -194,8 +180,8 @@ pub fn newHeaders(self: *const Client) !Net.Headers {
pub fn abort(self: *Client) void {
while (self.handles.in_use.first) |node| {
const handle: *Handle = @fieldParentPtr("node", node);
var transfer = Transfer.fromConnection(&handle.conn) catch |err| {
const conn: *Net.Connection = @fieldParentPtr("node", node);
var transfer = Transfer.fromConnection(conn) catch |err| {
log.err(.http, "get private info", .{ .err = err, .source = "abort" });
continue;
};
@@ -215,10 +201,11 @@ pub fn abort(self: *Client) void {
if (comptime IS_DEBUG) {
std.debug.assert(self.handles.in_use.first == null);
std.debug.assert(self.handles.available.len() == self.handles.handles.len);
std.debug.assert(self.handles.available.len() == self.handles.connections.len);
var running: c_int = undefined;
std.debug.assert(c.curl_multi_perform(self.multi, &running) == c.CURLE_OK);
const running = self.handles.perform() catch |err| {
lp.assert(false, "multi perform in abort", .{ .err = err });
};
std.debug.assert(running == 0);
}
}
@@ -231,9 +218,9 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
const queue_node = self.queue.popFirst() orelse break;
const transfer: *Transfer = @fieldParentPtr("_node", queue_node);
// we know this exists, because we checked isEmpty() above
const handle = self.handles.getFreeHandle().?;
try self.makeRequest(handle, transfer);
// we know this exists, because we checked hasAvailable() above
const conn = self.handles.get().?;
try self.makeRequest(conn, transfer);
}
return self.perform(@intCast(timeout_ms));
}
@@ -514,8 +501,8 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool {
// cases, the interecptor is expected to call resume to continue the transfer
// or transfer.abort() to abort it.
fn process(self: *Client, transfer: *Transfer) !void {
if (self.handles.getFreeHandle()) |handle| {
return self.makeRequest(handle, transfer);
if (self.handles.get()) |conn| {
return self.makeRequest(conn, transfer);
}
self.queue.append(&transfer._node);
@@ -628,8 +615,8 @@ fn requestFailed(transfer: *Transfer, err: anyerror, comptime execute_callback:
pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void {
try self.ensureNoActiveConnection();
for (self.handles.handles) |*h| {
try h.conn.setProxy(proxy.ptr);
for (self.handles.connections) |*conn| {
try conn.setProxy(proxy.ptr);
}
self.use_proxy = true;
}
@@ -640,8 +627,8 @@ pub fn restoreOriginalProxy(self: *Client) !void {
try self.ensureNoActiveConnection();
const proxy = if (self.http_proxy) |p| p.ptr else null;
for (self.handles.handles) |*h| {
try h.conn.setProxy(proxy);
for (self.handles.connections) |*conn| {
try conn.setProxy(proxy);
}
self.use_proxy = proxy != null;
}
@@ -651,8 +638,8 @@ pub fn enableTlsVerify(self: *const Client) !void {
// Remove inflight connections check on enable TLS b/c chromiumoxide calls
// the command during navigate and Curl seems to accept it...
for (self.handles.handles) |*h| {
try h.conn.setTlsVerify(true, self.use_proxy);
for (self.handles.connections) |*conn| {
try conn.setTlsVerify(true, self.use_proxy);
}
}
@@ -661,17 +648,16 @@ pub fn disableTlsVerify(self: *const Client) !void {
// Remove inflight connections check on disable TLS b/c chromiumoxide calls
// the command during navigate and Curl seems to accept it...
for (self.handles.handles) |*h| {
try h.conn.setTlsVerify(false, self.use_proxy);
for (self.handles.connections) |*conn| {
try conn.setTlsVerify(false, self.use_proxy);
}
}
fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) anyerror!void {
const conn = &handle.conn;
fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerror!void {
const req = &transfer.req;
{
transfer._handle = handle;
transfer._conn = conn;
errdefer transfer.deinit();
try conn.setURL(req.url);
@@ -704,11 +690,12 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) anyerror!voi
// fails BEFORE `curl_multi_add_handle` suceeds, the we still need to do
// cleanup. But if things fail after `curl_multi_add_handle`, we expect
// perfom to pickup the failure and cleanup.
try errorMCheck(c.curl_multi_add_handle(self.multi, conn.easy));
try self.handles.add(conn);
if (req.start_callback) |cb| {
cb(transfer) catch |err| {
try errorMCheck(c.curl_multi_remove_handle(self.multi, conn.easy));
self.handles.remove(conn);
transfer._conn = null;
transfer.deinit();
return err;
};
@@ -724,14 +711,7 @@ pub const PerformStatus = enum {
};
fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
const multi = self.multi;
var running: c_int = undefined;
{
self.performing = true;
defer self.performing = false;
try errorMCheck(c.curl_multi_perform(multi, &running));
}
const running = try self.handles.perform();
// We're potentially going to block for a while until we get data. Process
// whatever messages we have waiting ahead of time.
@@ -741,18 +721,17 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
var status = PerformStatus.normal;
if (self.cdp_client) |cdp_client| {
var wait_fd = c.curl_waitfd{
var wait_fds = [_]c.curl_waitfd{.{
.fd = cdp_client.socket,
.events = c.CURL_WAIT_POLLIN,
.revents = 0,
};
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller
}};
try self.handles.poll(&wait_fds, timeout_ms);
if (wait_fds[0].revents != 0) {
status = .cdp_socket;
}
} else if (running > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
try self.handles.poll(&.{}, timeout_ms);
}
_ = try self.processMessages();
@@ -760,19 +739,9 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
}
fn processMessages(self: *Client) !bool {
const multi = self.multi;
var processed = false;
var messages_count: c_int = 0;
while (c.curl_multi_info_read(multi, &messages_count)) |msg_| {
const msg: *c.CURLMsg = @ptrCast(msg_);
// This is the only possible message type from CURL for now.
if (comptime IS_DEBUG) {
std.debug.assert(msg.msg == c.CURLMSG_DONE);
}
const easy = msg.easy_handle.?;
const conn: Net.Connection = .{ .easy = easy };
const transfer = try Transfer.fromConnection(&conn);
while (self.handles.readMessage()) |msg| {
const transfer = try Transfer.fromConnection(&msg.conn);
// In case of auth challenge
// TODO give a way to configure the number of auth retries.
@@ -822,11 +791,13 @@ fn processMessages(self: *Client) !bool {
defer transfer.deinit();
if (errorCheck(msg.data.result)) blk: {
if (msg.err) |err| {
requestFailed(transfer, err, true);
} else blk: {
// In case of request w/o data, we need to call the header done
// callback now.
if (!transfer._header_done_called) {
const proceed = transfer.headerDoneCallback(&conn) catch |err| {
const proceed = transfer.headerDoneCallback(&msg.conn) catch |err| {
log.err(.http, "header_done_callback", .{ .err = err });
requestFailed(transfer, err, true);
continue;
@@ -847,22 +818,15 @@ fn processMessages(self: *Client) !bool {
.transfer = transfer,
});
processed = true;
} else |err| {
requestFailed(transfer, err, true);
}
}
return processed;
}
fn endTransfer(self: *Client, transfer: *Transfer) void {
const handle = transfer._handle.?;
errorMCheck(c.curl_multi_remove_handle(self.multi, handle.conn.easy)) catch |err| {
log.fatal(.http, "Failed to remove handle", .{ .err = err });
};
self.handles.release(handle);
transfer._handle = null;
const conn = transfer._conn.?;
self.handles.remove(conn);
transfer._conn = null;
self.active -= 1;
}
@@ -872,96 +836,7 @@ fn ensureNoActiveConnection(self: *const Client) !void {
}
}
const Handles = struct {
handles: []Handle,
in_use: HandleList,
available: HandleList,
const HandleList = std.DoublyLinkedList;
fn init(
allocator: Allocator,
client: *Client,
ca_blob: ?c.curl_blob,
config: *const Config,
) !Handles {
const count: usize = config.httpMaxConcurrent();
if (count == 0) return error.InvalidMaxConcurrent;
const handles = try allocator.alloc(Handle, count);
errdefer allocator.free(handles);
var available: HandleList = .{};
for (0..count) |i| {
handles[i] = try Handle.init(client, ca_blob, config);
available.append(&handles[i].node);
}
return .{
.in_use = .{},
.handles = handles,
.available = available,
};
}
fn deinit(self: *Handles, allocator: Allocator) void {
for (self.handles) |*h| {
h.deinit();
}
allocator.free(self.handles);
}
fn hasAvailable(self: *const Handles) bool {
return self.available.first != null;
}
fn getFreeHandle(self: *Handles) ?*Handle {
if (self.available.popFirst()) |node| {
node.prev = null;
node.next = null;
self.in_use.append(node);
return @as(*Handle, @fieldParentPtr("node", node));
}
return null;
}
fn release(self: *Handles, handle: *Handle) void {
var node = &handle.node;
self.in_use.remove(node);
node.prev = null;
node.next = null;
self.available.append(node);
}
};
// wraps a c.CURL (an easy handle)
pub const Handle = struct {
client: *Client,
conn: Net.Connection,
node: Handles.HandleList.Node,
fn init(
client: *Client,
ca_blob: ?c.curl_blob,
config: *const Config,
) !Handle {
var conn = try Net.Connection.init(ca_blob, config);
errdefer conn.deinit();
// callbacks
try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback);
return .{
.node = .{},
.conn = conn,
.client = client,
};
}
fn deinit(self: *const Handle) void {
self.conn.deinit();
}
};
const Handles = Net.Handles;
pub const RequestCookie = struct {
is_http: bool,
@@ -1059,7 +934,7 @@ pub const Transfer = struct {
_notified_fail: bool = false,
_handle: ?*Handle = null,
_conn: ?*Net.Connection = null,
_redirecting: bool = false,
_auth_challenge: ?AuthChallenge = null,
@@ -1102,8 +977,8 @@ pub const Transfer = struct {
fn deinit(self: *Transfer) void {
self.req.headers.deinit();
if (self._handle) |handle| {
self.client.handles.release(handle);
if (self._conn) |conn| {
self.client.handles.remove(conn);
}
self.arena.deinit();
self.client.transfer_pool.destroy(self);
@@ -1141,10 +1016,6 @@ pub const Transfer = struct {
return writer.print("{s} {s}", .{ @tagName(req.method), req.url });
}
pub fn addHeader(self: *Transfer, value: [:0]const u8) !void {
self._request_header_list = c.curl_slist_append(self._request_header_list, value);
}
pub fn updateURL(self: *Transfer, url: [:0]const u8) !void {
// for cookies
self.url = url;
@@ -1175,13 +1046,13 @@ pub const Transfer = struct {
pub fn abort(self: *Transfer, err: anyerror) void {
requestFailed(self, err, true);
if (self._handle == null) {
if (self._conn == null) {
self.deinit();
return;
}
const client = self.client;
if (client.performing) {
if (client.handles.performing) {
// We're currently in a curl_multi_perform. We cannot call endTransfer
// as that calls curl_multi_remove_handle, and you can't do that
// from a curl callback. Instead, we flag this transfer and all of
@@ -1191,7 +1062,7 @@ pub const Transfer = struct {
return;
}
if (self._handle != null) {
if (self._conn != null) {
client.endTransfer(self);
}
self.deinit();
@@ -1199,7 +1070,7 @@ pub const Transfer = struct {
pub fn terminate(self: *Transfer) void {
requestFailed(self, error.Shutdown, false);
if (self._handle != null) {
if (self._conn != null) {
self.client.endTransfer(self);
}
self.deinit();
@@ -1208,7 +1079,7 @@ pub const Transfer = struct {
// internal, when the page is shutting down. Doesn't have the same ceremony
// as abort (doesn't send a notification, doesn't invoke an error callback)
fn kill(self: *Transfer) void {
if (self._handle != null) {
if (self._conn != null) {
self.client.endTransfer(self);
}
if (self.req.shutdown_callback) |cb| {
@@ -1503,10 +1374,10 @@ pub const Transfer = struct {
}
pub fn responseHeaderIterator(self: *Transfer) HeaderIterator {
if (self._handle) |handle| {
// If we have a handle, than this is a real curl request and we
if (self._conn) |conn| {
// If we have a connection, than this is a real curl request and we
// iterate through the header that curl maintains.
return .{ .curl = .{ .conn = &handle.conn } };
return .{ .curl = .{ .conn = conn } };
}
// If there's no handle, it either means this is being called before
// the request is even being made (which would be a bug in the code)
@@ -1520,14 +1391,8 @@ pub const Transfer = struct {
return @ptrCast(@alignCast(private));
}
// pub because Page.printWaitAnalysis uses it
pub fn fromEasy(easy: *c.CURL) !*Transfer {
const conn: Net.Connection = .{ .easy = easy };
return fromConnection(&conn);
}
pub fn fulfill(transfer: *Transfer, status: u16, headers: []const Net.Header, body: ?[]const u8) !void {
if (transfer._handle != null) {
if (transfer._conn != null) {
// should never happen, should have been intercepted/paused, and then
// either continued, aborted or fulfilled once.
@branchHint(.unlikely);
@@ -1582,10 +1447,10 @@ pub const Transfer = struct {
}
fn getContentLengthRawValue(self: *const Transfer) ?[]const u8 {
if (self._handle) |handle| {
// If we have a handle, than this is a normal request. We can get the
// header value from the easy handle.
const cl = handle.conn.getResponseHeader("content-length", 0) orelse return null;
if (self._conn) |conn| {
// If we have a connection, than this is a normal request. We can get the
// header value from the connection.
const cl = conn.getResponseHeader("content-length", 0) orelse return null;
return cl.value;
}

View File

@@ -7,7 +7,7 @@ const Allocator = std.mem.Allocator;
const log = @import("../log.zig");
const App = @import("../App.zig");
const Http = @import("../http/Http.zig");
const Net = @import("../Net.zig");
const Config = @import("../Config.zig");
const telemetry = @import("telemetry.zig");
@@ -20,7 +20,7 @@ pub const LightPanda = struct {
allocator: Allocator,
mutex: std.Thread.Mutex,
cond: Thread.Condition,
connection: Http.Connection,
connection: Net.Connection,
config: *const Config,
pending: std.DoublyLinkedList,
mem_pool: std.heap.MemoryPool(LightPandaEvent),