From a95b4ea7b95004b8ea4c9516b97fbbc16f73e1e6 Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Wed, 11 Mar 2026 05:44:59 +0000 Subject: [PATCH] Use global connections poll --- src/browser/HttpClient.zig | 124 ++++++++++++++++++++++++++----------- src/network/Runtime.zig | 34 ++++++++++ src/network/http.zig | 103 +++++------------------------- src/sys/libcurl.zig | 14 ++++- 4 files changed, 150 insertions(+), 125 deletions(-) diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 98292efc..529d4e5a 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -66,9 +66,18 @@ active: usize, // 'networkAlmostIdle' Page.lifecycleEvent in CDP). intercepted: usize, -// Our easy handles, managed by a curl multi. +// Our curl multi handle. handles: Net.Handles, +// Connections currently in this client's curl_multi. +in_use: std.DoublyLinkedList = .{}, + +// Connections that failed to be removed from curl_multi during perform. +dirty: std.DoublyLinkedList = .{}, + +// Whether we're currently inside a curl_multi_perform call. +performing: bool = false, + // Use to generate the next request ID next_request_id: u32 = 0, @@ -88,8 +97,8 @@ pending_robots_queue: std.StringHashMapUnmanaged(std.ArrayList(Request)) = .empt // request. These wil come and go with each request. transfer_pool: std.heap.MemoryPool(Transfer), -// only needed for CDP which can change the proxy and then restore it. When -// restoring, this originally-configured value is what it goes to. +// The current proxy. CDP can change it, restoreOriginalProxy restores +// from config. http_proxy: ?[:0]const u8 = null, // track if the client use a proxy for connections. @@ -97,6 +106,9 @@ http_proxy: ?[:0]const u8 = null, // CDP. use_proxy: bool, +// Current TLS verification state, applied per-connection in makeRequest. +tls_verify: bool = true, + cdp_client: ?CDPClient = null, // libcurl can monitor arbitrary sockets, this lets us use libcurl to poll @@ -126,13 +138,8 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { const client = try allocator.create(Client); errdefer allocator.destroy(client); - var handles = try Net.Handles.init(allocator, network.ca_blob, network.config); - errdefer handles.deinit(allocator); - - // Set transfer callbacks on each connection. - for (handles.connections) |*conn| { - try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback); - } + var handles = try Net.Handles.init(network.config); + errdefer handles.deinit(); const http_proxy = network.config.httpProxy(); @@ -145,6 +152,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { .network = network, .http_proxy = http_proxy, .use_proxy = http_proxy != null, + .tls_verify = network.config.tlsVerifyHost(), .transfer_pool = transfer_pool, }; @@ -153,7 +161,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { pub fn deinit(self: *Client) void { self.abort(); - self.handles.deinit(self.allocator); + self.handles.deinit(); self.transfer_pool.deinit(); @@ -182,14 +190,14 @@ pub fn abortFrame(self: *Client, frame_id: u32) void { // but abort can avoid the frame_id check at comptime. fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { { - var q = &self.handles.in_use; + var q = &self.in_use; var n = q.first; while (n) |node| { n = node.next; const conn: *Net.Connection = @fieldParentPtr("node", node); var transfer = Transfer.fromConnection(conn) catch |err| { // Let's cleanup what we can - self.handles.remove(conn); + self.removeConn(conn); log.err(.http, "get private info", .{ .err = err, .source = "abort" }); continue; }; @@ -226,8 +234,7 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { } if (comptime IS_DEBUG and abort_all) { - std.debug.assert(self.handles.in_use.first == null); - std.debug.assert(self.handles.available.len() == self.handles.connections.len); + std.debug.assert(self.in_use.first == null); const running = self.handles.perform() catch |err| { lp.assert(false, "multi perform in abort", .{ .err = err }); @@ -237,15 +244,12 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { } pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { - while (true) { - if (self.handles.hasAvailable() == false) { + while (self.queue.popFirst()) |queue_node| { + const conn = self.network.getConnection() orelse { + self.queue.prepend(queue_node); break; - } - const queue_node = self.queue.popFirst() orelse break; + }; const transfer: *Transfer = @fieldParentPtr("_node", queue_node); - - // we know this exists, because we checked hasAvailable() above - const conn = self.handles.get().?; try self.makeRequest(conn, transfer); } return self.perform(@intCast(timeout_ms)); @@ -529,8 +533,8 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool { fn process(self: *Client, transfer: *Transfer) !void { // libcurl doesn't allow recursive calls, if we're in a `perform()` operation // then we _have_ to queue this. - if (self.handles.performing == false) { - if (self.handles.get()) |conn| { + if (self.performing == false) { + if (self.network.getConnection()) |conn| { return self.makeRequest(conn, transfer); } } @@ -645,9 +649,12 @@ fn requestFailed(transfer: *Transfer, err: anyerror, comptime execute_callback: pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void { try self.ensureNoActiveConnection(); - for (self.handles.connections) |*conn| { - try conn.setProxy(proxy.ptr); + var it = self.in_use.first; + while (it) |node| : (it = node.next) { + const conn: *Net.Connection = @fieldParentPtr("node", node); + try conn.setProxy(proxy); } + self.http_proxy = proxy; self.use_proxy = true; } @@ -656,11 +663,13 @@ pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void { pub fn restoreOriginalProxy(self: *Client) !void { try self.ensureNoActiveConnection(); - const proxy = if (self.http_proxy) |p| p.ptr else null; - for (self.handles.connections) |*conn| { - try conn.setProxy(proxy); + self.http_proxy = self.network.config.httpProxy(); + var it = self.in_use.first; + while (it) |node| : (it = node.next) { + const conn: *Net.Connection = @fieldParentPtr("node", node); + try conn.setProxy(self.http_proxy); } - self.use_proxy = proxy != null; + self.use_proxy = self.http_proxy != null; } // Enable TLS verification on all connections. @@ -668,9 +677,12 @@ pub fn enableTlsVerify(self: *Client) !void { // Remove inflight connections check on enable TLS b/c chromiumoxide calls // the command during navigate and Curl seems to accept it... - for (self.handles.connections) |*conn| { + var it = self.in_use.first; + while (it) |node| : (it = node.next) { + const conn: *Net.Connection = @fieldParentPtr("node", node); try conn.setTlsVerify(true, self.use_proxy); } + self.tls_verify = true; } // Disable TLS verification on all connections. @@ -678,9 +690,12 @@ pub fn disableTlsVerify(self: *Client) !void { // Remove inflight connections check on disable TLS b/c chromiumoxide calls // the command during navigate and Curl seems to accept it... - for (self.handles.connections) |*conn| { + var it = self.in_use.first; + while (it) |node| : (it = node.next) { + const conn: *Net.Connection = @fieldParentPtr("node", node); try conn.setTlsVerify(false, self.use_proxy); } + self.tls_verify = false; } fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerror!void { @@ -691,9 +706,14 @@ fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerr errdefer { transfer._conn = null; transfer.deinit(); - self.handles.isAvailable(conn); + self.releaseConn(conn); } + // Set callbacks and per-client settings on the pooled connection. + try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback); + try conn.setProxy(self.http_proxy); + try conn.setTlsVerify(self.tls_verify, self.use_proxy); + try conn.setURL(req.url); try conn.setMethod(req.method); if (req.body) |b| { @@ -728,10 +748,12 @@ fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerr // fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do // cleanup. But if things fail after `curl_multi_add_handle`, we expect // perfom to pickup the failure and cleanup. + self.in_use.append(&conn.node); self.handles.add(conn) catch |err| { transfer._conn = null; transfer.deinit(); - self.handles.isAvailable(conn); + self.in_use.remove(&conn.node); + self.releaseConn(conn); return err; }; @@ -752,7 +774,19 @@ pub const PerformStatus = enum { }; fn perform(self: *Client, timeout_ms: c_int) !PerformStatus { + self.performing = true; const running = try self.handles.perform(); + self.performing = false; + + // Process dirty connections — return them to Runtime pool. + while (self.dirty.popFirst()) |node| { + const conn: *Net.Connection = @fieldParentPtr("node", node); + self.handles.remove(conn) catch |err| { + log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); + @panic("multi_remove_handle"); + }; + self.releaseConn(conn); + } // We're potentially going to block for a while until we get data. Process // whatever messages we have waiting ahead of time. @@ -871,11 +905,27 @@ fn processMessages(self: *Client) !bool { fn endTransfer(self: *Client, transfer: *Transfer) void { const conn = transfer._conn.?; - self.handles.remove(conn); + self.removeConn(conn); transfer._conn = null; self.active -= 1; } +fn removeConn(self: *Client, conn: *Net.Connection) void { + self.in_use.remove(&conn.node); + if (self.handles.remove(conn)) { + self.releaseConn(conn); + } else |_| { + // Can happen if we're in a perform() call, so we'll queue this + // for cleanup later. + self.dirty.append(&conn.node); + } +} + +fn releaseConn(self: *Client, conn: *Net.Connection) void { + conn.reset() catch {}; + self.network.releaseConnection(conn); +} + fn ensureNoActiveConnection(self: *const Client) !void { if (self.active > 0) { return error.InflightConnection; @@ -1023,7 +1073,7 @@ pub const Transfer = struct { fn deinit(self: *Transfer) void { self.req.headers.deinit(); if (self._conn) |conn| { - self.client.handles.remove(conn); + self.client.removeConn(conn); } self.arena.deinit(); self.client.transfer_pool.destroy(self); @@ -1093,7 +1143,7 @@ pub const Transfer = struct { requestFailed(self, err, true); const client = self.client; - if (self._performing or client.handles.performing) { + if (self._performing or client.performing) { // We're currently in a curl_multi_perform. We cannot call endTransfer // as that calls curl_multi_remove_handle, and you can't do that // from a curl callback. Instead, we flag this transfer and all of diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 0112dc18..7c534ee2 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -43,6 +43,10 @@ config: *const Config, ca_blob: ?net_http.Blob, robot_store: RobotStore, +connections: []net_http.Connection, +available: std.DoublyLinkedList = .{}, +conn_mutex: std.Thread.Mutex = .{}, + pollfds: []posix.pollfd, listener: ?Listener = null, @@ -79,11 +83,23 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { ca_blob = try loadCerts(allocator); } + const count: usize = config.httpMaxConcurrent(); + const connections = try allocator.alloc(net_http.Connection, count); + errdefer allocator.free(connections); + + var available: std.DoublyLinkedList = .{}; + for (0..count) |i| { + connections[i] = try net_http.Connection.init(ca_blob, config); + available.append(&connections[i].node); + } + return .{ .allocator = allocator, .config = config, .ca_blob = ca_blob, .robot_store = RobotStore.init(allocator), + .connections = connections, + .available = available, .pollfds = pollfds, .wakeup_pipe = pipe, }; @@ -104,6 +120,11 @@ pub fn deinit(self: *Runtime) void { self.allocator.free(data[0..ca_blob.len]); } + for (self.connections) |*conn| { + conn.deinit(); + } + self.allocator.free(self.connections); + self.robot_store.deinit(); globalDeinit(); @@ -192,6 +213,19 @@ pub fn stop(self: *Runtime) void { _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; } +pub fn getConnection(self: *Runtime) ?*net_http.Connection { + self.conn_mutex.lock(); + defer self.conn_mutex.unlock(); + const node = self.available.popFirst() orelse return null; + return @fieldParentPtr("node", node); +} + +pub fn releaseConnection(self: *Runtime, conn: *net_http.Connection) void { + self.conn_mutex.lock(); + defer self.conn_mutex.unlock(); + self.available.append(&conn.node); +} + pub fn newConnection(self: *Runtime) !net_http.Connection { return net_http.Connection.init(self.ca_blob, self.config); } diff --git a/src/network/http.zig b/src/network/http.zig index 28fd7736..b0f70375 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -237,7 +237,7 @@ pub const ResponseHead = struct { pub const Connection = struct { easy: *libcurl.Curl, - node: Handles.HandleList.Node = .{}, + node: std.DoublyLinkedList.Node = .{}, pub fn init( ca_blob_: ?libcurl.CurlBlob, @@ -385,8 +385,16 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self.easy, .write_function, data_cb); } - pub fn setProxy(self: *const Connection, proxy: ?[*:0]const u8) !void { - try libcurl.curl_easy_setopt(self.easy, .proxy, proxy); + pub fn reset(self: *const Connection) !void { + try libcurl.curl_easy_setopt(self.easy, .header_data, null); + try libcurl.curl_easy_setopt(self.easy, .header_function, null); + try libcurl.curl_easy_setopt(self.easy, .write_data, null); + try libcurl.curl_easy_setopt(self.easy, .write_function, null); + try libcurl.curl_easy_setopt(self.easy, .proxy, null); + } + + pub fn setProxy(self: *const Connection, proxy: ?[:0]const u8) !void { + try libcurl.curl_easy_setopt(self.easy, .proxy, if (proxy) |p| p.ptr else null); } pub fn setTlsVerify(self: *const Connection, verify: bool, use_proxy: bool) !void { @@ -467,111 +475,32 @@ pub const Connection = struct { }; pub const Handles = struct { - connections: []Connection, - dirty: HandleList, - in_use: HandleList, - available: HandleList, multi: *libcurl.CurlM, - performing: bool = false, - - pub const HandleList = std.DoublyLinkedList; - - pub fn init( - allocator: Allocator, - ca_blob: ?libcurl.CurlBlob, - config: *const Config, - ) !Handles { - const count: usize = config.httpMaxConcurrent(); - if (count == 0) return error.InvalidMaxConcurrent; + pub fn init(config: *const Config) !Handles { const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; errdefer libcurl.curl_multi_cleanup(multi) catch {}; try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen()); - const connections = try allocator.alloc(Connection, count); - errdefer allocator.free(connections); - - var available: HandleList = .{}; - for (0..count) |i| { - connections[i] = try Connection.init(ca_blob, config); - available.append(&connections[i].node); - } - - return .{ - .dirty = .{}, - .in_use = .{}, - .connections = connections, - .available = available, - .multi = multi, - }; + return .{ .multi = multi }; } - pub fn deinit(self: *Handles, allocator: Allocator) void { - for (self.connections) |*conn| { - conn.deinit(); - } - allocator.free(self.connections); + pub fn deinit(self: *Handles) void { libcurl.curl_multi_cleanup(self.multi) catch {}; } - pub fn hasAvailable(self: *const Handles) bool { - return self.available.first != null; - } - - pub fn get(self: *Handles) ?*Connection { - if (self.available.popFirst()) |node| { - self.in_use.append(node); - return @as(*Connection, @fieldParentPtr("node", node)); - } - return null; - } - pub fn add(self: *Handles, conn: *const Connection) !void { try libcurl.curl_multi_add_handle(self.multi, conn.easy); } - pub fn remove(self: *Handles, conn: *Connection) void { - if (libcurl.curl_multi_remove_handle(self.multi, conn.easy)) { - self.isAvailable(conn); - } else |err| { - // can happen if we're in a perform() call, so we'll queue this - // for cleanup later. - const node = &conn.node; - self.in_use.remove(node); - self.dirty.append(node); - log.warn(.http, "multi remove handle", .{ .err = err }); - } - } - - pub fn isAvailable(self: *Handles, conn: *Connection) void { - const node = &conn.node; - self.in_use.remove(node); - self.available.append(node); + pub fn remove(self: *Handles, conn: *const Connection) !void { + try libcurl.curl_multi_remove_handle(self.multi, conn.easy); } pub fn perform(self: *Handles) !c_int { - self.performing = true; - defer self.performing = false; - - const multi = self.multi; var running: c_int = undefined; try libcurl.curl_multi_perform(self.multi, &running); - - { - const list = &self.dirty; - while (list.first) |node| { - list.remove(node); - const conn: *Connection = @fieldParentPtr("node", node); - if (libcurl.curl_multi_remove_handle(multi, conn.easy)) { - self.available.append(node); - } else |err| { - log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); - @panic("multi_remove_handle"); - } - } - } - return running; } diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index 759cba25..f9c9ac12 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -543,7 +543,10 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .header_data, .write_data, => blk: { - const ptr: *anyopaque = @ptrCast(value); + const ptr: ?*anyopaque = switch (@typeInfo(@TypeOf(value))) { + .null => null, + else => @ptrCast(value), + }; break :blk c.curl_easy_setopt(easy, opt, ptr); }, @@ -703,6 +706,15 @@ pub fn curl_multi_poll( try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds)); } +pub fn curl_multi_waitfds(multi: *CurlM, ufds: []CurlWaitFd, fd_count: *c_uint) ErrorMulti!void { + const raw_fds: [*c]c.curl_waitfd = if (ufds.len == 0) null else @ptrCast(ufds.ptr); + try errorMCheck(c.curl_multi_waitfds(multi, raw_fds, @intCast(ufds.len), fd_count)); +} + +pub fn curl_multi_timeout(multi: *CurlM, timeout_ms: *c_long) ErrorMulti!void { + try errorMCheck(c.curl_multi_timeout(multi, timeout_ms)); +} + pub fn curl_multi_info_read(multi: *CurlM, msgs_in_queue: *c_int) ?CurlMsg { const ptr = c.curl_multi_info_read(multi, msgs_in_queue); if (ptr == null) return null;