From 3e2a4d80531d6e5fc48ccdadea49434e0fac097f Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 24 Feb 2026 08:26:18 +0000 Subject: [PATCH] Move curl_multi to Net layer --- src/Net.zig | 105 ++++++++++++++ src/browser/Page.zig | 5 +- src/http/Client.zig | 261 +++++++++-------------------------- src/telemetry/lightpanda.zig | 4 +- 4 files changed, 173 insertions(+), 202 deletions(-) diff --git a/src/Net.zig b/src/Net.zig index c667a902..0dd20cc5 100644 --- a/src/Net.zig +++ b/src/Net.zig @@ -471,6 +471,7 @@ pub fn globalDeinit() void { pub const Connection = struct { easy: *c.CURL, + node: Handles.HandleList.Node = .{}, pub fn init( ca_blob_: ?c.curl_blob, @@ -700,6 +701,110 @@ pub const Connection = struct { } }; +pub const Handles = struct { + connections: []Connection, + in_use: HandleList, + available: HandleList, + multi: *c.CURLM, + performing: bool = false, + + pub const HandleList = std.DoublyLinkedList; + + pub fn init( + allocator: Allocator, + ca_blob: ?c.curl_blob, + config: *const Config, + ) !Handles { + const count: usize = config.httpMaxConcurrent(); + if (count == 0) return error.InvalidMaxConcurrent; + + const multi = c.curl_multi_init() orelse return error.FailedToInitializeMulti; + errdefer _ = c.curl_multi_cleanup(multi); + + try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, config.httpMaxHostOpen()))); + + const connections = try allocator.alloc(Connection, count); + errdefer allocator.free(connections); + + var available: HandleList = .{}; + for (0..count) |i| { + connections[i] = try Connection.init(ca_blob, config); + available.append(&connections[i].node); + } + + return .{ + .in_use = .{}, + .connections = connections, + .available = available, + .multi = multi, + }; + } + + pub fn deinit(self: *Handles, allocator: Allocator) void { + for (self.connections) |*conn| { + conn.deinit(); + } + allocator.free(self.connections); + _ = c.curl_multi_cleanup(self.multi); + } + + pub fn hasAvailable(self: *const Handles) bool { + return self.available.first != null; + } + + pub fn get(self: *Handles) ?*Connection { + if (self.available.popFirst()) |node| { + node.prev = null; + node.next = null; + self.in_use.append(node); + return @as(*Connection, @fieldParentPtr("node", node)); + } + return null; + } + + pub fn add(self: *Handles, conn: *const Connection) !void { + try errorMCheck(c.curl_multi_add_handle(self.multi, conn.easy)); + } + + pub fn remove(self: *Handles, conn: *Connection) void { + errorMCheck(c.curl_multi_remove_handle(self.multi, conn.easy)) catch |err| { + log.fatal(.http, "multi remove handle", .{ .err = err }); + }; + var node = &conn.node; + self.in_use.remove(node); + node.prev = null; + node.next = null; + self.available.append(node); + } + + pub fn perform(self: *Handles) !c_int { + var running: c_int = undefined; + self.performing = true; + defer self.performing = false; + try errorMCheck(c.curl_multi_perform(self.multi, &running)); + return running; + } + + pub fn poll(self: *Handles, extra_fds: []c.curl_waitfd, timeout_ms: c_int) !void { + try errorMCheck(c.curl_multi_poll(self.multi, extra_fds.ptr, @intCast(extra_fds.len), timeout_ms, null)); + } + + pub const MultiMessage = struct { + conn: Connection, + err: ?Error, + }; + + pub fn readMessage(self: *Handles) ?MultiMessage { + var messages_count: c_int = 0; + const msg_ = c.curl_multi_info_read(self.multi, &messages_count) orelse return null; + const msg: *c.CURLMsg = @ptrCast(msg_); + return .{ + .conn = .{ .easy = msg.easy_handle.? }, + .err = if (errorCheck(msg.data.result)) |_| null else |err| err, + }; + } +}; + // TODO: on BSD / Linux, we could just read the PEM file directly. // This whole rescan + decode is really just needed for MacOS. On Linux // bundle.rescan does find the .pem file(s) which could be in a few different diff --git a/src/browser/Page.zig b/src/browser/Page.zig index 11ff9ebc..b6734627 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -63,6 +63,7 @@ const NavigationKind = @import("webapi/navigation/root.zig").NavigationKind; const KeyboardEvent = @import("webapi/event/KeyboardEvent.zig"); const Http = App.Http; +const Net = @import("../Net.zig"); const ArenaPool = App.ArenaPool; const timestamp = @import("../datetime.zig").timestamp; @@ -1112,8 +1113,8 @@ fn printWaitAnalysis(self: *Page) void { std.debug.print("\nactive requests: {d}\n", .{self._session.browser.http_client.active}); var n_ = self._session.browser.http_client.handles.in_use.first; while (n_) |n| { - const handle: *Http.Client.Handle = @fieldParentPtr("node", n); - const transfer = Http.Transfer.fromEasy(handle.conn.easy) catch |err| { + const conn: *Net.Connection = @fieldParentPtr("node", n); + const transfer = Http.Transfer.fromConnection(conn) catch |err| { std.debug.print(" - failed to load transfer: {any}\n", .{err}); break; }; diff --git a/src/http/Client.zig b/src/http/Client.zig index d3498a01..1878272d 100644 --- a/src/http/Client.zig +++ b/src/http/Client.zig @@ -70,14 +70,7 @@ active: usize, // 'networkAlmostIdle' Page.lifecycleEvent in CDP). intercepted: usize, -// curl has 2 APIs: easy and multi. Multi is like a combination of some I/O block -// (e.g. epoll) and a bunch of pools. You add/remove easys to the multiple and -// then poll the multi. -multi: *c.CURLM, - -// Our easy handles. Although the multi contains buffer pools and connections -// pools, re-using the easys is still recommended. This acts as our own pool -// of easys. +// Our easy handles, managed by a curl multi. handles: Handles, // Use to generate the next request ID @@ -113,10 +106,6 @@ config: *const Config, cdp_client: ?CDPClient = null, -// keep track of when curl_multi_perform is happening so that we can avoid -// recursive calls into curl (which it will fail) -performing: bool = false, - // libcurl can monitor arbitrary sockets, this lets us use libcurl to poll // both HTTP data as well as messages from an CDP connection. // Furthermore, we have some tension between blocking scripts and request @@ -144,21 +133,20 @@ pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, robot_store: *RobotStor const client = try allocator.create(Client); errdefer allocator.destroy(client); - const multi = c.curl_multi_init() orelse return error.FailedToInitializeMulti; - errdefer _ = c.curl_multi_cleanup(multi); - - try errorMCheck(c.curl_multi_setopt(multi, c.CURLMOPT_MAX_HOST_CONNECTIONS, @as(c_long, config.httpMaxHostOpen()))); - - var handles = try Handles.init(allocator, client, ca_blob, config); + var handles = try Handles.init(allocator, ca_blob, config); errdefer handles.deinit(allocator); + // Set transfer callbacks on each connection. + for (handles.connections) |*conn| { + try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback); + } + const http_proxy = config.httpProxy(); client.* = .{ .queue = .{}, .active = 0, .intercepted = 0, - .multi = multi, .handles = handles, .allocator = allocator, .robot_store = robot_store, @@ -175,8 +163,6 @@ pub fn deinit(self: *Client) void { self.abort(); self.handles.deinit(self.allocator); - _ = c.curl_multi_cleanup(self.multi); - self.transfer_pool.deinit(); var robots_iter = self.pending_robots_queue.iterator(); @@ -194,8 +180,8 @@ pub fn newHeaders(self: *const Client) !Net.Headers { pub fn abort(self: *Client) void { while (self.handles.in_use.first) |node| { - const handle: *Handle = @fieldParentPtr("node", node); - var transfer = Transfer.fromConnection(&handle.conn) catch |err| { + const conn: *Net.Connection = @fieldParentPtr("node", node); + var transfer = Transfer.fromConnection(conn) catch |err| { log.err(.http, "get private info", .{ .err = err, .source = "abort" }); continue; }; @@ -215,10 +201,11 @@ pub fn abort(self: *Client) void { if (comptime IS_DEBUG) { std.debug.assert(self.handles.in_use.first == null); - std.debug.assert(self.handles.available.len() == self.handles.handles.len); + std.debug.assert(self.handles.available.len() == self.handles.connections.len); - var running: c_int = undefined; - std.debug.assert(c.curl_multi_perform(self.multi, &running) == c.CURLE_OK); + const running = self.handles.perform() catch |err| { + lp.assert(false, "multi perform in abort", .{ .err = err }); + }; std.debug.assert(running == 0); } } @@ -231,9 +218,9 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { const queue_node = self.queue.popFirst() orelse break; const transfer: *Transfer = @fieldParentPtr("_node", queue_node); - // we know this exists, because we checked isEmpty() above - const handle = self.handles.getFreeHandle().?; - try self.makeRequest(handle, transfer); + // we know this exists, because we checked hasAvailable() above + const conn = self.handles.get().?; + try self.makeRequest(conn, transfer); } return self.perform(@intCast(timeout_ms)); } @@ -514,8 +501,8 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool { // cases, the interecptor is expected to call resume to continue the transfer // or transfer.abort() to abort it. fn process(self: *Client, transfer: *Transfer) !void { - if (self.handles.getFreeHandle()) |handle| { - return self.makeRequest(handle, transfer); + if (self.handles.get()) |conn| { + return self.makeRequest(conn, transfer); } self.queue.append(&transfer._node); @@ -628,8 +615,8 @@ fn requestFailed(transfer: *Transfer, err: anyerror, comptime execute_callback: pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void { try self.ensureNoActiveConnection(); - for (self.handles.handles) |*h| { - try h.conn.setProxy(proxy.ptr); + for (self.handles.connections) |*conn| { + try conn.setProxy(proxy.ptr); } self.use_proxy = true; } @@ -640,8 +627,8 @@ pub fn restoreOriginalProxy(self: *Client) !void { try self.ensureNoActiveConnection(); const proxy = if (self.http_proxy) |p| p.ptr else null; - for (self.handles.handles) |*h| { - try h.conn.setProxy(proxy); + for (self.handles.connections) |*conn| { + try conn.setProxy(proxy); } self.use_proxy = proxy != null; } @@ -651,8 +638,8 @@ pub fn enableTlsVerify(self: *const Client) !void { // Remove inflight connections check on enable TLS b/c chromiumoxide calls // the command during navigate and Curl seems to accept it... - for (self.handles.handles) |*h| { - try h.conn.setTlsVerify(true, self.use_proxy); + for (self.handles.connections) |*conn| { + try conn.setTlsVerify(true, self.use_proxy); } } @@ -661,17 +648,16 @@ pub fn disableTlsVerify(self: *const Client) !void { // Remove inflight connections check on disable TLS b/c chromiumoxide calls // the command during navigate and Curl seems to accept it... - for (self.handles.handles) |*h| { - try h.conn.setTlsVerify(false, self.use_proxy); + for (self.handles.connections) |*conn| { + try conn.setTlsVerify(false, self.use_proxy); } } -fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) anyerror!void { - const conn = &handle.conn; +fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerror!void { const req = &transfer.req; { - transfer._handle = handle; + transfer._conn = conn; errdefer transfer.deinit(); try conn.setURL(req.url); @@ -704,11 +690,12 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) anyerror!voi // fails BEFORE `curl_multi_add_handle` suceeds, the we still need to do // cleanup. But if things fail after `curl_multi_add_handle`, we expect // perfom to pickup the failure and cleanup. - try errorMCheck(c.curl_multi_add_handle(self.multi, conn.easy)); + try self.handles.add(conn); if (req.start_callback) |cb| { cb(transfer) catch |err| { - try errorMCheck(c.curl_multi_remove_handle(self.multi, conn.easy)); + self.handles.remove(conn); + transfer._conn = null; transfer.deinit(); return err; }; @@ -724,14 +711,7 @@ pub const PerformStatus = enum { }; fn perform(self: *Client, timeout_ms: c_int) !PerformStatus { - const multi = self.multi; - var running: c_int = undefined; - - { - self.performing = true; - defer self.performing = false; - try errorMCheck(c.curl_multi_perform(multi, &running)); - } + const running = try self.handles.perform(); // We're potentially going to block for a while until we get data. Process // whatever messages we have waiting ahead of time. @@ -741,18 +721,17 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus { var status = PerformStatus.normal; if (self.cdp_client) |cdp_client| { - var wait_fd = c.curl_waitfd{ + var wait_fds = [_]c.curl_waitfd{.{ .fd = cdp_client.socket, .events = c.CURL_WAIT_POLLIN, .revents = 0, - }; - try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null)); - if (wait_fd.revents != 0) { - // the extra socket we passed in is ready, let's signal our caller + }}; + try self.handles.poll(&wait_fds, timeout_ms); + if (wait_fds[0].revents != 0) { status = .cdp_socket; } } else if (running > 0) { - try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null)); + try self.handles.poll(&.{}, timeout_ms); } _ = try self.processMessages(); @@ -760,19 +739,9 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus { } fn processMessages(self: *Client) !bool { - const multi = self.multi; var processed = false; - var messages_count: c_int = 0; - while (c.curl_multi_info_read(multi, &messages_count)) |msg_| { - const msg: *c.CURLMsg = @ptrCast(msg_); - // This is the only possible message type from CURL for now. - if (comptime IS_DEBUG) { - std.debug.assert(msg.msg == c.CURLMSG_DONE); - } - - const easy = msg.easy_handle.?; - const conn: Net.Connection = .{ .easy = easy }; - const transfer = try Transfer.fromConnection(&conn); + while (self.handles.readMessage()) |msg| { + const transfer = try Transfer.fromConnection(&msg.conn); // In case of auth challenge // TODO give a way to configure the number of auth retries. @@ -822,11 +791,13 @@ fn processMessages(self: *Client) !bool { defer transfer.deinit(); - if (errorCheck(msg.data.result)) blk: { + if (msg.err) |err| { + requestFailed(transfer, err, true); + } else blk: { // In case of request w/o data, we need to call the header done // callback now. if (!transfer._header_done_called) { - const proceed = transfer.headerDoneCallback(&conn) catch |err| { + const proceed = transfer.headerDoneCallback(&msg.conn) catch |err| { log.err(.http, "header_done_callback", .{ .err = err }); requestFailed(transfer, err, true); continue; @@ -847,22 +818,15 @@ fn processMessages(self: *Client) !bool { .transfer = transfer, }); processed = true; - } else |err| { - requestFailed(transfer, err, true); } } return processed; } fn endTransfer(self: *Client, transfer: *Transfer) void { - const handle = transfer._handle.?; - - errorMCheck(c.curl_multi_remove_handle(self.multi, handle.conn.easy)) catch |err| { - log.fatal(.http, "Failed to remove handle", .{ .err = err }); - }; - - self.handles.release(handle); - transfer._handle = null; + const conn = transfer._conn.?; + self.handles.remove(conn); + transfer._conn = null; self.active -= 1; } @@ -872,96 +836,7 @@ fn ensureNoActiveConnection(self: *const Client) !void { } } -const Handles = struct { - handles: []Handle, - in_use: HandleList, - available: HandleList, - - const HandleList = std.DoublyLinkedList; - - fn init( - allocator: Allocator, - client: *Client, - ca_blob: ?c.curl_blob, - config: *const Config, - ) !Handles { - const count: usize = config.httpMaxConcurrent(); - if (count == 0) return error.InvalidMaxConcurrent; - - const handles = try allocator.alloc(Handle, count); - errdefer allocator.free(handles); - - var available: HandleList = .{}; - for (0..count) |i| { - handles[i] = try Handle.init(client, ca_blob, config); - available.append(&handles[i].node); - } - - return .{ - .in_use = .{}, - .handles = handles, - .available = available, - }; - } - - fn deinit(self: *Handles, allocator: Allocator) void { - for (self.handles) |*h| { - h.deinit(); - } - allocator.free(self.handles); - } - - fn hasAvailable(self: *const Handles) bool { - return self.available.first != null; - } - - fn getFreeHandle(self: *Handles) ?*Handle { - if (self.available.popFirst()) |node| { - node.prev = null; - node.next = null; - self.in_use.append(node); - return @as(*Handle, @fieldParentPtr("node", node)); - } - return null; - } - - fn release(self: *Handles, handle: *Handle) void { - var node = &handle.node; - self.in_use.remove(node); - node.prev = null; - node.next = null; - self.available.append(node); - } -}; - -// wraps a c.CURL (an easy handle) -pub const Handle = struct { - client: *Client, - conn: Net.Connection, - node: Handles.HandleList.Node, - - fn init( - client: *Client, - ca_blob: ?c.curl_blob, - config: *const Config, - ) !Handle { - var conn = try Net.Connection.init(ca_blob, config); - errdefer conn.deinit(); - - // callbacks - try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback); - - return .{ - .node = .{}, - .conn = conn, - .client = client, - }; - } - - fn deinit(self: *const Handle) void { - self.conn.deinit(); - } -}; +const Handles = Net.Handles; pub const RequestCookie = struct { is_http: bool, @@ -1059,7 +934,7 @@ pub const Transfer = struct { _notified_fail: bool = false, - _handle: ?*Handle = null, + _conn: ?*Net.Connection = null, _redirecting: bool = false, _auth_challenge: ?AuthChallenge = null, @@ -1102,8 +977,8 @@ pub const Transfer = struct { fn deinit(self: *Transfer) void { self.req.headers.deinit(); - if (self._handle) |handle| { - self.client.handles.release(handle); + if (self._conn) |conn| { + self.client.handles.remove(conn); } self.arena.deinit(); self.client.transfer_pool.destroy(self); @@ -1141,10 +1016,6 @@ pub const Transfer = struct { return writer.print("{s} {s}", .{ @tagName(req.method), req.url }); } - pub fn addHeader(self: *Transfer, value: [:0]const u8) !void { - self._request_header_list = c.curl_slist_append(self._request_header_list, value); - } - pub fn updateURL(self: *Transfer, url: [:0]const u8) !void { // for cookies self.url = url; @@ -1175,13 +1046,13 @@ pub const Transfer = struct { pub fn abort(self: *Transfer, err: anyerror) void { requestFailed(self, err, true); - if (self._handle == null) { + if (self._conn == null) { self.deinit(); return; } const client = self.client; - if (client.performing) { + if (client.handles.performing) { // We're currently in a curl_multi_perform. We cannot call endTransfer // as that calls curl_multi_remove_handle, and you can't do that // from a curl callback. Instead, we flag this transfer and all of @@ -1191,7 +1062,7 @@ pub const Transfer = struct { return; } - if (self._handle != null) { + if (self._conn != null) { client.endTransfer(self); } self.deinit(); @@ -1199,7 +1070,7 @@ pub const Transfer = struct { pub fn terminate(self: *Transfer) void { requestFailed(self, error.Shutdown, false); - if (self._handle != null) { + if (self._conn != null) { self.client.endTransfer(self); } self.deinit(); @@ -1208,7 +1079,7 @@ pub const Transfer = struct { // internal, when the page is shutting down. Doesn't have the same ceremony // as abort (doesn't send a notification, doesn't invoke an error callback) fn kill(self: *Transfer) void { - if (self._handle != null) { + if (self._conn != null) { self.client.endTransfer(self); } if (self.req.shutdown_callback) |cb| { @@ -1503,10 +1374,10 @@ pub const Transfer = struct { } pub fn responseHeaderIterator(self: *Transfer) HeaderIterator { - if (self._handle) |handle| { - // If we have a handle, than this is a real curl request and we + if (self._conn) |conn| { + // If we have a connection, than this is a real curl request and we // iterate through the header that curl maintains. - return .{ .curl = .{ .conn = &handle.conn } }; + return .{ .curl = .{ .conn = conn } }; } // If there's no handle, it either means this is being called before // the request is even being made (which would be a bug in the code) @@ -1520,14 +1391,8 @@ pub const Transfer = struct { return @ptrCast(@alignCast(private)); } - // pub because Page.printWaitAnalysis uses it - pub fn fromEasy(easy: *c.CURL) !*Transfer { - const conn: Net.Connection = .{ .easy = easy }; - return fromConnection(&conn); - } - pub fn fulfill(transfer: *Transfer, status: u16, headers: []const Net.Header, body: ?[]const u8) !void { - if (transfer._handle != null) { + if (transfer._conn != null) { // should never happen, should have been intercepted/paused, and then // either continued, aborted or fulfilled once. @branchHint(.unlikely); @@ -1582,10 +1447,10 @@ pub const Transfer = struct { } fn getContentLengthRawValue(self: *const Transfer) ?[]const u8 { - if (self._handle) |handle| { - // If we have a handle, than this is a normal request. We can get the - // header value from the easy handle. - const cl = handle.conn.getResponseHeader("content-length", 0) orelse return null; + if (self._conn) |conn| { + // If we have a connection, than this is a normal request. We can get the + // header value from the connection. + const cl = conn.getResponseHeader("content-length", 0) orelse return null; return cl.value; } diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 8d27f135..d141e060 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -7,7 +7,7 @@ const Allocator = std.mem.Allocator; const log = @import("../log.zig"); const App = @import("../App.zig"); -const Http = @import("../http/Http.zig"); +const Net = @import("../Net.zig"); const Config = @import("../Config.zig"); const telemetry = @import("telemetry.zig"); @@ -20,7 +20,7 @@ pub const LightPanda = struct { allocator: Allocator, mutex: std.Thread.Mutex, cond: Thread.Condition, - connection: Http.Connection, + connection: Net.Connection, config: *const Config, pending: std.DoublyLinkedList, mem_pool: std.heap.MemoryPool(LightPandaEvent),