mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-03-22 04:34:44 +00:00
Merge pull request #1781 from lightpanda-io/wp/mrdimidium/telemetry-network
Some checks failed
e2e-test / zig build release (push) Has been cancelled
e2e-test / demo-scripts (push) Has been cancelled
e2e-test / cdp-and-hyperfine-bench (push) Has been cancelled
e2e-test / perf-fmt (push) Has been cancelled
e2e-test / browser fetch (push) Has been cancelled
zig-test / zig test using v8 in debug mode (push) Has been cancelled
zig-test / zig test (push) Has been cancelled
zig-test / perf-fmt (push) Has been cancelled
Some checks failed
e2e-test / zig build release (push) Has been cancelled
e2e-test / demo-scripts (push) Has been cancelled
e2e-test / cdp-and-hyperfine-bench (push) Has been cancelled
e2e-test / perf-fmt (push) Has been cancelled
e2e-test / browser fetch (push) Has been cancelled
zig-test / zig test using v8 in debug mode (push) Has been cancelled
zig-test / zig test (push) Has been cancelled
zig-test / perf-fmt (push) Has been cancelled
Use global connections poll
This commit is contained in:
@@ -66,9 +66,18 @@ active: usize,
|
|||||||
// 'networkAlmostIdle' Page.lifecycleEvent in CDP).
|
// 'networkAlmostIdle' Page.lifecycleEvent in CDP).
|
||||||
intercepted: usize,
|
intercepted: usize,
|
||||||
|
|
||||||
// Our easy handles, managed by a curl multi.
|
// Our curl multi handle.
|
||||||
handles: Net.Handles,
|
handles: Net.Handles,
|
||||||
|
|
||||||
|
// Connections currently in this client's curl_multi.
|
||||||
|
in_use: std.DoublyLinkedList = .{},
|
||||||
|
|
||||||
|
// Connections that failed to be removed from curl_multi during perform.
|
||||||
|
dirty: std.DoublyLinkedList = .{},
|
||||||
|
|
||||||
|
// Whether we're currently inside a curl_multi_perform call.
|
||||||
|
performing: bool = false,
|
||||||
|
|
||||||
// Use to generate the next request ID
|
// Use to generate the next request ID
|
||||||
next_request_id: u32 = 0,
|
next_request_id: u32 = 0,
|
||||||
|
|
||||||
@@ -88,8 +97,8 @@ pending_robots_queue: std.StringHashMapUnmanaged(std.ArrayList(Request)) = .empt
|
|||||||
// request. These wil come and go with each request.
|
// request. These wil come and go with each request.
|
||||||
transfer_pool: std.heap.MemoryPool(Transfer),
|
transfer_pool: std.heap.MemoryPool(Transfer),
|
||||||
|
|
||||||
// only needed for CDP which can change the proxy and then restore it. When
|
// The current proxy. CDP can change it, restoreOriginalProxy restores
|
||||||
// restoring, this originally-configured value is what it goes to.
|
// from config.
|
||||||
http_proxy: ?[:0]const u8 = null,
|
http_proxy: ?[:0]const u8 = null,
|
||||||
|
|
||||||
// track if the client use a proxy for connections.
|
// track if the client use a proxy for connections.
|
||||||
@@ -97,6 +106,9 @@ http_proxy: ?[:0]const u8 = null,
|
|||||||
// CDP.
|
// CDP.
|
||||||
use_proxy: bool,
|
use_proxy: bool,
|
||||||
|
|
||||||
|
// Current TLS verification state, applied per-connection in makeRequest.
|
||||||
|
tls_verify: bool = true,
|
||||||
|
|
||||||
cdp_client: ?CDPClient = null,
|
cdp_client: ?CDPClient = null,
|
||||||
|
|
||||||
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
|
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
|
||||||
@@ -126,13 +138,8 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
|
|||||||
const client = try allocator.create(Client);
|
const client = try allocator.create(Client);
|
||||||
errdefer allocator.destroy(client);
|
errdefer allocator.destroy(client);
|
||||||
|
|
||||||
var handles = try Net.Handles.init(allocator, network.ca_blob, network.config);
|
var handles = try Net.Handles.init(network.config);
|
||||||
errdefer handles.deinit(allocator);
|
errdefer handles.deinit();
|
||||||
|
|
||||||
// Set transfer callbacks on each connection.
|
|
||||||
for (handles.connections) |*conn| {
|
|
||||||
try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback);
|
|
||||||
}
|
|
||||||
|
|
||||||
const http_proxy = network.config.httpProxy();
|
const http_proxy = network.config.httpProxy();
|
||||||
|
|
||||||
@@ -145,6 +152,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
|
|||||||
.network = network,
|
.network = network,
|
||||||
.http_proxy = http_proxy,
|
.http_proxy = http_proxy,
|
||||||
.use_proxy = http_proxy != null,
|
.use_proxy = http_proxy != null,
|
||||||
|
.tls_verify = network.config.tlsVerifyHost(),
|
||||||
.transfer_pool = transfer_pool,
|
.transfer_pool = transfer_pool,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -153,7 +161,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
|
|||||||
|
|
||||||
pub fn deinit(self: *Client) void {
|
pub fn deinit(self: *Client) void {
|
||||||
self.abort();
|
self.abort();
|
||||||
self.handles.deinit(self.allocator);
|
self.handles.deinit();
|
||||||
|
|
||||||
self.transfer_pool.deinit();
|
self.transfer_pool.deinit();
|
||||||
|
|
||||||
@@ -182,14 +190,14 @@ pub fn abortFrame(self: *Client, frame_id: u32) void {
|
|||||||
// but abort can avoid the frame_id check at comptime.
|
// but abort can avoid the frame_id check at comptime.
|
||||||
fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
|
fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
|
||||||
{
|
{
|
||||||
var q = &self.handles.in_use;
|
var q = &self.in_use;
|
||||||
var n = q.first;
|
var n = q.first;
|
||||||
while (n) |node| {
|
while (n) |node| {
|
||||||
n = node.next;
|
n = node.next;
|
||||||
const conn: *Net.Connection = @fieldParentPtr("node", node);
|
const conn: *Net.Connection = @fieldParentPtr("node", node);
|
||||||
var transfer = Transfer.fromConnection(conn) catch |err| {
|
var transfer = Transfer.fromConnection(conn) catch |err| {
|
||||||
// Let's cleanup what we can
|
// Let's cleanup what we can
|
||||||
self.handles.remove(conn);
|
self.removeConn(conn);
|
||||||
log.err(.http, "get private info", .{ .err = err, .source = "abort" });
|
log.err(.http, "get private info", .{ .err = err, .source = "abort" });
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
@@ -226,8 +234,7 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (comptime IS_DEBUG and abort_all) {
|
if (comptime IS_DEBUG and abort_all) {
|
||||||
std.debug.assert(self.handles.in_use.first == null);
|
std.debug.assert(self.in_use.first == null);
|
||||||
std.debug.assert(self.handles.available.len() == self.handles.connections.len);
|
|
||||||
|
|
||||||
const running = self.handles.perform() catch |err| {
|
const running = self.handles.perform() catch |err| {
|
||||||
lp.assert(false, "multi perform in abort", .{ .err = err });
|
lp.assert(false, "multi perform in abort", .{ .err = err });
|
||||||
@@ -237,15 +244,12 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
|
pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
|
||||||
while (true) {
|
while (self.queue.popFirst()) |queue_node| {
|
||||||
if (self.handles.hasAvailable() == false) {
|
const conn = self.network.getConnection() orelse {
|
||||||
|
self.queue.prepend(queue_node);
|
||||||
break;
|
break;
|
||||||
}
|
};
|
||||||
const queue_node = self.queue.popFirst() orelse break;
|
|
||||||
const transfer: *Transfer = @fieldParentPtr("_node", queue_node);
|
const transfer: *Transfer = @fieldParentPtr("_node", queue_node);
|
||||||
|
|
||||||
// we know this exists, because we checked hasAvailable() above
|
|
||||||
const conn = self.handles.get().?;
|
|
||||||
try self.makeRequest(conn, transfer);
|
try self.makeRequest(conn, transfer);
|
||||||
}
|
}
|
||||||
return self.perform(@intCast(timeout_ms));
|
return self.perform(@intCast(timeout_ms));
|
||||||
@@ -529,8 +533,8 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool {
|
|||||||
fn process(self: *Client, transfer: *Transfer) !void {
|
fn process(self: *Client, transfer: *Transfer) !void {
|
||||||
// libcurl doesn't allow recursive calls, if we're in a `perform()` operation
|
// libcurl doesn't allow recursive calls, if we're in a `perform()` operation
|
||||||
// then we _have_ to queue this.
|
// then we _have_ to queue this.
|
||||||
if (self.handles.performing == false) {
|
if (self.performing == false) {
|
||||||
if (self.handles.get()) |conn| {
|
if (self.network.getConnection()) |conn| {
|
||||||
return self.makeRequest(conn, transfer);
|
return self.makeRequest(conn, transfer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -644,10 +648,7 @@ fn requestFailed(transfer: *Transfer, err: anyerror, comptime execute_callback:
|
|||||||
// can be changed at any point in the easy's lifecycle.
|
// can be changed at any point in the easy's lifecycle.
|
||||||
pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void {
|
pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void {
|
||||||
try self.ensureNoActiveConnection();
|
try self.ensureNoActiveConnection();
|
||||||
|
self.http_proxy = proxy;
|
||||||
for (self.handles.connections) |*conn| {
|
|
||||||
try conn.setProxy(proxy.ptr);
|
|
||||||
}
|
|
||||||
self.use_proxy = true;
|
self.use_proxy = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -656,31 +657,21 @@ pub fn changeProxy(self: *Client, proxy: [:0]const u8) !void {
|
|||||||
pub fn restoreOriginalProxy(self: *Client) !void {
|
pub fn restoreOriginalProxy(self: *Client) !void {
|
||||||
try self.ensureNoActiveConnection();
|
try self.ensureNoActiveConnection();
|
||||||
|
|
||||||
const proxy = if (self.http_proxy) |p| p.ptr else null;
|
self.http_proxy = self.network.config.httpProxy();
|
||||||
for (self.handles.connections) |*conn| {
|
self.use_proxy = self.http_proxy != null;
|
||||||
try conn.setProxy(proxy);
|
|
||||||
}
|
|
||||||
self.use_proxy = proxy != null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enable TLS verification on all connections.
|
// Enable TLS verification on all connections.
|
||||||
pub fn enableTlsVerify(self: *Client) !void {
|
pub fn setTlsVerify(self: *Client, verify: bool) !void {
|
||||||
// Remove inflight connections check on enable TLS b/c chromiumoxide calls
|
// Remove inflight connections check on enable TLS b/c chromiumoxide calls
|
||||||
// the command during navigate and Curl seems to accept it...
|
// the command during navigate and Curl seems to accept it...
|
||||||
|
|
||||||
for (self.handles.connections) |*conn| {
|
var it = self.in_use.first;
|
||||||
try conn.setTlsVerify(true, self.use_proxy);
|
while (it) |node| : (it = node.next) {
|
||||||
}
|
const conn: *Net.Connection = @fieldParentPtr("node", node);
|
||||||
}
|
try conn.setTlsVerify(verify, self.use_proxy);
|
||||||
|
|
||||||
// Disable TLS verification on all connections.
|
|
||||||
pub fn disableTlsVerify(self: *Client) !void {
|
|
||||||
// Remove inflight connections check on disable TLS b/c chromiumoxide calls
|
|
||||||
// the command during navigate and Curl seems to accept it...
|
|
||||||
|
|
||||||
for (self.handles.connections) |*conn| {
|
|
||||||
try conn.setTlsVerify(false, self.use_proxy);
|
|
||||||
}
|
}
|
||||||
|
self.tls_verify = verify;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerror!void {
|
fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerror!void {
|
||||||
@@ -691,9 +682,14 @@ fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerr
|
|||||||
errdefer {
|
errdefer {
|
||||||
transfer._conn = null;
|
transfer._conn = null;
|
||||||
transfer.deinit();
|
transfer.deinit();
|
||||||
self.handles.isAvailable(conn);
|
self.releaseConn(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set callbacks and per-client settings on the pooled connection.
|
||||||
|
try conn.setCallbacks(Transfer.headerCallback, Transfer.dataCallback);
|
||||||
|
try conn.setProxy(self.http_proxy);
|
||||||
|
try conn.setTlsVerify(self.tls_verify, self.use_proxy);
|
||||||
|
|
||||||
try conn.setURL(req.url);
|
try conn.setURL(req.url);
|
||||||
try conn.setMethod(req.method);
|
try conn.setMethod(req.method);
|
||||||
if (req.body) |b| {
|
if (req.body) |b| {
|
||||||
@@ -728,10 +724,12 @@ fn makeRequest(self: *Client, conn: *Net.Connection, transfer: *Transfer) anyerr
|
|||||||
// fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do
|
// fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do
|
||||||
// cleanup. But if things fail after `curl_multi_add_handle`, we expect
|
// cleanup. But if things fail after `curl_multi_add_handle`, we expect
|
||||||
// perfom to pickup the failure and cleanup.
|
// perfom to pickup the failure and cleanup.
|
||||||
|
self.in_use.append(&conn.node);
|
||||||
self.handles.add(conn) catch |err| {
|
self.handles.add(conn) catch |err| {
|
||||||
transfer._conn = null;
|
transfer._conn = null;
|
||||||
transfer.deinit();
|
transfer.deinit();
|
||||||
self.handles.isAvailable(conn);
|
self.in_use.remove(&conn.node);
|
||||||
|
self.releaseConn(conn);
|
||||||
return err;
|
return err;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -752,7 +750,22 @@ pub const PerformStatus = enum {
|
|||||||
};
|
};
|
||||||
|
|
||||||
fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
|
fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
|
||||||
const running = try self.handles.perform();
|
const running = blk: {
|
||||||
|
self.performing = true;
|
||||||
|
defer self.performing = false;
|
||||||
|
|
||||||
|
break :blk try self.handles.perform();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Process dirty connections — return them to Runtime pool.
|
||||||
|
while (self.dirty.popFirst()) |node| {
|
||||||
|
const conn: *Net.Connection = @fieldParentPtr("node", node);
|
||||||
|
self.handles.remove(conn) catch |err| {
|
||||||
|
log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" });
|
||||||
|
@panic("multi_remove_handle");
|
||||||
|
};
|
||||||
|
self.releaseConn(conn);
|
||||||
|
}
|
||||||
|
|
||||||
// We're potentially going to block for a while until we get data. Process
|
// We're potentially going to block for a while until we get data. Process
|
||||||
// whatever messages we have waiting ahead of time.
|
// whatever messages we have waiting ahead of time.
|
||||||
@@ -871,11 +884,26 @@ fn processMessages(self: *Client) !bool {
|
|||||||
|
|
||||||
fn endTransfer(self: *Client, transfer: *Transfer) void {
|
fn endTransfer(self: *Client, transfer: *Transfer) void {
|
||||||
const conn = transfer._conn.?;
|
const conn = transfer._conn.?;
|
||||||
self.handles.remove(conn);
|
self.removeConn(conn);
|
||||||
transfer._conn = null;
|
transfer._conn = null;
|
||||||
self.active -= 1;
|
self.active -= 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn removeConn(self: *Client, conn: *Net.Connection) void {
|
||||||
|
self.in_use.remove(&conn.node);
|
||||||
|
if (self.handles.remove(conn)) {
|
||||||
|
self.releaseConn(conn);
|
||||||
|
} else |_| {
|
||||||
|
// Can happen if we're in a perform() call, so we'll queue this
|
||||||
|
// for cleanup later.
|
||||||
|
self.dirty.append(&conn.node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn releaseConn(self: *Client, conn: *Net.Connection) void {
|
||||||
|
self.network.releaseConnection(conn);
|
||||||
|
}
|
||||||
|
|
||||||
fn ensureNoActiveConnection(self: *const Client) !void {
|
fn ensureNoActiveConnection(self: *const Client) !void {
|
||||||
if (self.active > 0) {
|
if (self.active > 0) {
|
||||||
return error.InflightConnection;
|
return error.InflightConnection;
|
||||||
@@ -1023,7 +1051,7 @@ pub const Transfer = struct {
|
|||||||
fn deinit(self: *Transfer) void {
|
fn deinit(self: *Transfer) void {
|
||||||
self.req.headers.deinit();
|
self.req.headers.deinit();
|
||||||
if (self._conn) |conn| {
|
if (self._conn) |conn| {
|
||||||
self.client.handles.remove(conn);
|
self.client.removeConn(conn);
|
||||||
}
|
}
|
||||||
self.arena.deinit();
|
self.arena.deinit();
|
||||||
self.client.transfer_pool.destroy(self);
|
self.client.transfer_pool.destroy(self);
|
||||||
@@ -1093,7 +1121,7 @@ pub const Transfer = struct {
|
|||||||
requestFailed(self, err, true);
|
requestFailed(self, err, true);
|
||||||
|
|
||||||
const client = self.client;
|
const client = self.client;
|
||||||
if (self._performing or client.handles.performing) {
|
if (self._performing or client.performing) {
|
||||||
// We're currently in a curl_multi_perform. We cannot call endTransfer
|
// We're currently in a curl_multi_perform. We cannot call endTransfer
|
||||||
// as that calls curl_multi_remove_handle, and you can't do that
|
// as that calls curl_multi_remove_handle, and you can't do that
|
||||||
// from a curl callback. Instead, we flag this transfer and all of
|
// from a curl callback. Instead, we flag this transfer and all of
|
||||||
|
|||||||
@@ -35,12 +35,7 @@ fn setIgnoreCertificateErrors(cmd: anytype) !void {
|
|||||||
ignore: bool,
|
ignore: bool,
|
||||||
})) orelse return error.InvalidParams;
|
})) orelse return error.InvalidParams;
|
||||||
|
|
||||||
if (params.ignore) {
|
try cmd.cdp.browser.http_client.setTlsVerify(!params.ignore);
|
||||||
try cmd.cdp.browser.http_client.disableTlsVerify();
|
|
||||||
} else {
|
|
||||||
try cmd.cdp.browser.http_client.enableTlsVerify();
|
|
||||||
}
|
|
||||||
|
|
||||||
return cmd.sendResult(null, .{});
|
return cmd.sendResult(null, .{});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,10 @@ config: *const Config,
|
|||||||
ca_blob: ?net_http.Blob,
|
ca_blob: ?net_http.Blob,
|
||||||
robot_store: RobotStore,
|
robot_store: RobotStore,
|
||||||
|
|
||||||
|
connections: []net_http.Connection,
|
||||||
|
available: std.DoublyLinkedList = .{},
|
||||||
|
conn_mutex: std.Thread.Mutex = .{},
|
||||||
|
|
||||||
pollfds: []posix.pollfd,
|
pollfds: []posix.pollfd,
|
||||||
listener: ?Listener = null,
|
listener: ?Listener = null,
|
||||||
|
|
||||||
@@ -191,11 +195,23 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime {
|
|||||||
ca_blob = try loadCerts(allocator);
|
ca_blob = try loadCerts(allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const count: usize = config.httpMaxConcurrent();
|
||||||
|
const connections = try allocator.alloc(net_http.Connection, count);
|
||||||
|
errdefer allocator.free(connections);
|
||||||
|
|
||||||
|
var available: std.DoublyLinkedList = .{};
|
||||||
|
for (0..count) |i| {
|
||||||
|
connections[i] = try net_http.Connection.init(ca_blob, config);
|
||||||
|
available.append(&connections[i].node);
|
||||||
|
}
|
||||||
|
|
||||||
return .{
|
return .{
|
||||||
.allocator = allocator,
|
.allocator = allocator,
|
||||||
.config = config,
|
.config = config,
|
||||||
.ca_blob = ca_blob,
|
.ca_blob = ca_blob,
|
||||||
.robot_store = RobotStore.init(allocator),
|
.robot_store = RobotStore.init(allocator),
|
||||||
|
.connections = connections,
|
||||||
|
.available = available,
|
||||||
.pollfds = pollfds,
|
.pollfds = pollfds,
|
||||||
.wakeup_pipe = pipe,
|
.wakeup_pipe = pipe,
|
||||||
};
|
};
|
||||||
@@ -216,6 +232,11 @@ pub fn deinit(self: *Runtime) void {
|
|||||||
self.allocator.free(data[0..ca_blob.len]);
|
self.allocator.free(data[0..ca_blob.len]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (self.connections) |*conn| {
|
||||||
|
conn.deinit();
|
||||||
|
}
|
||||||
|
self.allocator.free(self.connections);
|
||||||
|
|
||||||
self.robot_store.deinit();
|
self.robot_store.deinit();
|
||||||
|
|
||||||
globalDeinit();
|
globalDeinit();
|
||||||
@@ -310,6 +331,25 @@ pub fn stop(self: *Runtime) void {
|
|||||||
_ = posix.write(self.wakeup_pipe[1], &.{1}) catch {};
|
_ = posix.write(self.wakeup_pipe[1], &.{1}) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn getConnection(self: *Runtime) ?*net_http.Connection {
|
||||||
|
self.conn_mutex.lock();
|
||||||
|
defer self.conn_mutex.unlock();
|
||||||
|
|
||||||
|
const node = self.available.popFirst() orelse return null;
|
||||||
|
return @fieldParentPtr("node", node);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn releaseConnection(self: *Runtime, conn: *net_http.Connection) void {
|
||||||
|
conn.reset() catch |err| {
|
||||||
|
lp.assert(false, "couldn't reset curl easy", .{ .err = err });
|
||||||
|
};
|
||||||
|
|
||||||
|
self.conn_mutex.lock();
|
||||||
|
defer self.conn_mutex.unlock();
|
||||||
|
|
||||||
|
self.available.append(&conn.node);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn newConnection(self: *Runtime) !net_http.Connection {
|
pub fn newConnection(self: *Runtime) !net_http.Connection {
|
||||||
return net_http.Connection.init(self.ca_blob, self.config);
|
return net_http.Connection.init(self.ca_blob, self.config);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -237,7 +237,7 @@ pub const ResponseHead = struct {
|
|||||||
|
|
||||||
pub const Connection = struct {
|
pub const Connection = struct {
|
||||||
easy: *libcurl.Curl,
|
easy: *libcurl.Curl,
|
||||||
node: Handles.HandleList.Node = .{},
|
node: std.DoublyLinkedList.Node = .{},
|
||||||
|
|
||||||
pub fn init(
|
pub fn init(
|
||||||
ca_blob_: ?libcurl.CurlBlob,
|
ca_blob_: ?libcurl.CurlBlob,
|
||||||
@@ -385,8 +385,16 @@ pub const Connection = struct {
|
|||||||
try libcurl.curl_easy_setopt(self.easy, .write_function, data_cb);
|
try libcurl.curl_easy_setopt(self.easy, .write_function, data_cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setProxy(self: *const Connection, proxy: ?[*:0]const u8) !void {
|
pub fn reset(self: *const Connection) !void {
|
||||||
try libcurl.curl_easy_setopt(self.easy, .proxy, proxy);
|
try libcurl.curl_easy_setopt(self.easy, .header_data, null);
|
||||||
|
try libcurl.curl_easy_setopt(self.easy, .header_function, null);
|
||||||
|
try libcurl.curl_easy_setopt(self.easy, .write_data, null);
|
||||||
|
try libcurl.curl_easy_setopt(self.easy, .write_function, null);
|
||||||
|
try libcurl.curl_easy_setopt(self.easy, .proxy, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn setProxy(self: *const Connection, proxy: ?[:0]const u8) !void {
|
||||||
|
try libcurl.curl_easy_setopt(self.easy, .proxy, if (proxy) |p| p.ptr else null);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setTlsVerify(self: *const Connection, verify: bool, use_proxy: bool) !void {
|
pub fn setTlsVerify(self: *const Connection, verify: bool, use_proxy: bool) !void {
|
||||||
@@ -467,111 +475,32 @@ pub const Connection = struct {
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub const Handles = struct {
|
pub const Handles = struct {
|
||||||
connections: []Connection,
|
|
||||||
dirty: HandleList,
|
|
||||||
in_use: HandleList,
|
|
||||||
available: HandleList,
|
|
||||||
multi: *libcurl.CurlM,
|
multi: *libcurl.CurlM,
|
||||||
performing: bool = false,
|
|
||||||
|
|
||||||
pub const HandleList = std.DoublyLinkedList;
|
|
||||||
|
|
||||||
pub fn init(
|
|
||||||
allocator: Allocator,
|
|
||||||
ca_blob: ?libcurl.CurlBlob,
|
|
||||||
config: *const Config,
|
|
||||||
) !Handles {
|
|
||||||
const count: usize = config.httpMaxConcurrent();
|
|
||||||
if (count == 0) return error.InvalidMaxConcurrent;
|
|
||||||
|
|
||||||
|
pub fn init(config: *const Config) !Handles {
|
||||||
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
|
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
|
||||||
errdefer libcurl.curl_multi_cleanup(multi) catch {};
|
errdefer libcurl.curl_multi_cleanup(multi) catch {};
|
||||||
|
|
||||||
try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen());
|
try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen());
|
||||||
|
|
||||||
const connections = try allocator.alloc(Connection, count);
|
return .{ .multi = multi };
|
||||||
errdefer allocator.free(connections);
|
|
||||||
|
|
||||||
var available: HandleList = .{};
|
|
||||||
for (0..count) |i| {
|
|
||||||
connections[i] = try Connection.init(ca_blob, config);
|
|
||||||
available.append(&connections[i].node);
|
|
||||||
}
|
|
||||||
|
|
||||||
return .{
|
|
||||||
.dirty = .{},
|
|
||||||
.in_use = .{},
|
|
||||||
.connections = connections,
|
|
||||||
.available = available,
|
|
||||||
.multi = multi,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn deinit(self: *Handles, allocator: Allocator) void {
|
pub fn deinit(self: *Handles) void {
|
||||||
for (self.connections) |*conn| {
|
|
||||||
conn.deinit();
|
|
||||||
}
|
|
||||||
allocator.free(self.connections);
|
|
||||||
libcurl.curl_multi_cleanup(self.multi) catch {};
|
libcurl.curl_multi_cleanup(self.multi) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn hasAvailable(self: *const Handles) bool {
|
|
||||||
return self.available.first != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(self: *Handles) ?*Connection {
|
|
||||||
if (self.available.popFirst()) |node| {
|
|
||||||
self.in_use.append(node);
|
|
||||||
return @as(*Connection, @fieldParentPtr("node", node));
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add(self: *Handles, conn: *const Connection) !void {
|
pub fn add(self: *Handles, conn: *const Connection) !void {
|
||||||
try libcurl.curl_multi_add_handle(self.multi, conn.easy);
|
try libcurl.curl_multi_add_handle(self.multi, conn.easy);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(self: *Handles, conn: *Connection) void {
|
pub fn remove(self: *Handles, conn: *const Connection) !void {
|
||||||
if (libcurl.curl_multi_remove_handle(self.multi, conn.easy)) {
|
try libcurl.curl_multi_remove_handle(self.multi, conn.easy);
|
||||||
self.isAvailable(conn);
|
|
||||||
} else |err| {
|
|
||||||
// can happen if we're in a perform() call, so we'll queue this
|
|
||||||
// for cleanup later.
|
|
||||||
const node = &conn.node;
|
|
||||||
self.in_use.remove(node);
|
|
||||||
self.dirty.append(node);
|
|
||||||
log.warn(.http, "multi remove handle", .{ .err = err });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn isAvailable(self: *Handles, conn: *Connection) void {
|
|
||||||
const node = &conn.node;
|
|
||||||
self.in_use.remove(node);
|
|
||||||
self.available.append(node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn perform(self: *Handles) !c_int {
|
pub fn perform(self: *Handles) !c_int {
|
||||||
self.performing = true;
|
|
||||||
defer self.performing = false;
|
|
||||||
|
|
||||||
const multi = self.multi;
|
|
||||||
var running: c_int = undefined;
|
var running: c_int = undefined;
|
||||||
try libcurl.curl_multi_perform(self.multi, &running);
|
try libcurl.curl_multi_perform(self.multi, &running);
|
||||||
|
|
||||||
{
|
|
||||||
const list = &self.dirty;
|
|
||||||
while (list.first) |node| {
|
|
||||||
list.remove(node);
|
|
||||||
const conn: *Connection = @fieldParentPtr("node", node);
|
|
||||||
if (libcurl.curl_multi_remove_handle(multi, conn.easy)) {
|
|
||||||
self.available.append(node);
|
|
||||||
} else |err| {
|
|
||||||
log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" });
|
|
||||||
@panic("multi_remove_handle");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return running;
|
return running;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -590,7 +590,10 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype
|
|||||||
.header_data,
|
.header_data,
|
||||||
.write_data,
|
.write_data,
|
||||||
=> blk: {
|
=> blk: {
|
||||||
const ptr: *anyopaque = @ptrCast(value);
|
const ptr: ?*anyopaque = switch (@typeInfo(@TypeOf(value))) {
|
||||||
|
.null => null,
|
||||||
|
else => @ptrCast(value),
|
||||||
|
};
|
||||||
break :blk c.curl_easy_setopt(easy, opt, ptr);
|
break :blk c.curl_easy_setopt(easy, opt, ptr);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user