Create Client.Transfer earlier.

On client.request(req) we now immediately wrap the request into a Transfer. This
results in less copying of the Request object. It also makes the transfer.uri
available, so CDP no longer needs to std.Uri(request.url) anymore.

The main advantage is that it's easier to manage resources. There was a use-
after free before due to the sensitive nature of the tranfer's lifetime. There
were also corner cases where some resources might not be freed. This is
hopefully fixed with the lifetime of Transfer being extended.
This commit is contained in:
Karl Seguin
2025-08-13 18:05:00 +08:00
parent 2dc09c799f
commit ca9e850ac7
5 changed files with 144 additions and 108 deletions

View File

@@ -64,10 +64,10 @@ handles: Handles,
next_request_id: u64 = 0,
// When handles has no more available easys, requests get queued.
queue: RequestQueue,
queue: TransferQueue,
// Memory pool for Queue nodes.
queue_node_pool: std.heap.MemoryPool(RequestQueue.Node),
queue_node_pool: std.heap.MemoryPool(TransferQueue.Node),
// The main app allocator
allocator: Allocator,
@@ -93,13 +93,13 @@ arena: ArenaAllocator,
// restoring, this originally-configured value is what it goes to.
http_proxy: ?[:0]const u8 = null,
const RequestQueue = std.DoublyLinkedList(Request);
const TransferQueue = std.DoublyLinkedList(*Transfer);
pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts) !*Client {
var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator);
errdefer transfer_pool.deinit();
var queue_node_pool = std.heap.MemoryPool(RequestQueue.Node).init(allocator);
var queue_node_pool = std.heap.MemoryPool(TransferQueue.Node).init(allocator);
errdefer queue_node_pool.deinit();
const client = try allocator.create(Client);
@@ -151,8 +151,7 @@ pub fn abort(self: *Client) void {
log.err(.http, "get private info", .{ .err = err, .source = "abort" });
continue;
};
self.requestFailed(&transfer.req, error.Abort);
self.endTransfer(transfer);
transfer.abort();
}
std.debug.assert(self.active == 0);
@@ -193,46 +192,83 @@ pub fn tick(self: *Client, timeout_ms: usize) !void {
}
pub fn request(self: *Client, req: Request) !void {
var req_copy = req; // We need it mutable
const transfer = try self.makeTransfer(req);
if (req_copy.id == null) { // If the ID has already been set that means the request was previously intercepted
req_copy.id = self.next_request_id;
self.next_request_id += 1;
if (self.notification) |notification| {
notification.dispatch(.http_request_start, &.{ .request = &req_copy });
if (self.notification) |notification| {
notification.dispatch(.http_request_start, &.{ .transfer = transfer });
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .request = &req_copy, .wait_for_interception = &wait_for_interception });
if (wait_for_interception) return; // The user is send an invitation to intercept this request.
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception) {
// The user is send an invitation to intercept this request.
return;
}
}
return self.process(transfer);
}
// Above, request will not process if there's an interception request. In such
// cases, the interecptor is expected to call process to continue the transfer
// or transfer.abort() to abort it.
pub fn process(self: *Client, transfer: *Transfer) !void {
if (self.handles.getFreeHandle()) |handle| {
return self.makeRequest(handle, req_copy);
return self.makeRequest(handle, transfer);
}
const node = try self.queue_node_pool.create();
node.data = req_copy;
node.data = transfer;
self.queue.append(node);
}
// See ScriptManager.blockingGet
pub fn blockingRequest(self: *Client, req: Request) !void {
return self.makeRequest(&self.blocking, req);
const transfer = try self.makeTransfer(req);
return self.makeRequest(&self.blocking, transfer);
}
fn requestFailed(self: *Client, req: *Request, err: anyerror) void {
if (req._notified_fail) return;
req._notified_fail = true;
fn makeTransfer(self: *Client, req: Request) !*Transfer {
errdefer req.headers.deinit();
// we need this for cookies
const uri = std.Uri.parse(req.url) catch |err| {
log.warn(.http, "invalid url", .{ .err = err, .url = req.url });
return err;
};
const transfer = try self.transfer_pool.create();
errdefer self.transfer_pool.destroy(transfer);
const id = self.next_request_id + 1;
self.next_request_id = id;
transfer.* = .{
.id = id,
.uri = uri,
.req = req,
.ctx = req.ctx,
.client = self,
};
return transfer;
}
fn requestFailed(self: *Client, transfer: *Transfer, err: anyerror) void {
// this shoudln't happen, we'll crash in debug mode. But in release, we'll
// just noop this state.
std.debug.assert(transfer._notified_fail == false);
if (transfer._notified_fail) {
return;
}
transfer._notified_fail = true;
if (self.notification) |notification| {
notification.dispatch(.http_request_fail, &.{
.request = req,
.transfer = transfer,
.err = err,
});
}
req.error_callback(req.ctx, err);
transfer.req.error_callback(transfer.ctx, err);
}
// Restrictive since it'll only work if there are no inflight requests. In some
@@ -265,54 +301,40 @@ pub fn restoreOriginalProxy(self: *Client) !void {
try errorCheck(c.curl_easy_setopt(self.blocking.conn.easy, c.CURLOPT_PROXY, proxy));
}
fn makeRequest(self: *Client, handle: *Handle, req: Request) !void {
fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
const conn = handle.conn;
const easy = conn.easy;
const req = &transfer.req;
// we need this for cookies
const uri = std.Uri.parse(req.url) catch |err| {
self.handles.release(handle);
log.warn(.http, "invalid url", .{ .err = err, .url = req.url });
return;
};
var header_list = req.headers;
{
errdefer self.handles.release(handle);
try conn.setMethod(req.method);
try conn.setURL(req.url);
transfer._handle = handle;
errdefer transfer.deinit();
try conn.setURL(req.url);
try conn.setMethod(req.method);
if (req.body) |b| {
try conn.setBody(b);
}
var header_list = req.headers;
try conn.secretHeaders(&header_list); // Add headers that must be hidden from intercepts
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_HTTPHEADER, header_list.headers));
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_PRIVATE, transfer));
}
{
errdefer self.handles.release(handle);
// Once soon as this is called, our "perform" loop is responsible for
// cleaning things up. That's why the above code is in a block. If anything
// fails BEFORE `curl_multi_add_handle` suceeds, the we still need to do
// cleanup. But if things fail after `curl_multi_add_handle`, we expect
// perfom to pickup the failure and cleanup.
try errorMCheck(c.curl_multi_add_handle(self.multi, easy));
const transfer = try self.transfer_pool.create();
transfer.* = .{
.id = 0,
.uri = uri,
.req = req,
.ctx = req.ctx,
.handle = handle,
._request_header_list = header_list.headers,
if (req.start_callback) |cb| {
cb(transfer) catch |err| {
try errorMCheck(c.curl_multi_remove_handle(self.multi, easy));
transfer.deinit();
return err;
};
errdefer self.transfer_pool.destroy(transfer);
try errorCheck(c.curl_easy_setopt(easy, c.CURLOPT_PRIVATE, transfer));
try errorMCheck(c.curl_multi_add_handle(self.multi, easy));
if (req.start_callback) |cb| {
cb(transfer) catch |err| {
try errorMCheck(c.curl_multi_remove_handle(self.multi, easy));
return err;
};
}
}
self.active += 1;
@@ -336,38 +358,36 @@ fn perform(self: *Client, timeout_ms: c_int) !void {
std.debug.assert(msg.msg == c.CURLMSG_DONE);
const easy = msg.easy_handle.?;
const transfer = try Transfer.fromEasy(easy);
const ctx = transfer.ctx;
const done_callback = transfer.req.done_callback;
// release it ASAP so that it's available; some done_callbacks
// will load more resources.
self.endTransfer(transfer);
defer transfer.deinit();
if (errorCheck(msg.data.result)) {
done_callback(ctx) catch |err| {
transfer.req.done_callback(transfer.ctx) catch |err| {
// transfer isn't valid at this point, don't use it.
log.err(.http, "done_callback", .{ .err = err });
self.requestFailed(&transfer.req, err);
self.requestFailed(transfer, err);
};
// self.requestComplete(transfer);
} else |err| {
self.requestFailed(&transfer.req, err);
self.requestFailed(transfer, err);
}
}
}
fn endTransfer(self: *Client, transfer: *Transfer) void {
const handle = transfer.handle;
transfer.deinit();
self.transfer_pool.destroy(transfer);
const handle = transfer._handle.?;
errorMCheck(c.curl_multi_remove_handle(self.multi, handle.conn.easy)) catch |err| {
log.fatal(.http, "Failed to remove handle", .{ .err = err });
};
self.handles.release(handle);
transfer._handle = null;
self.active -= 1;
}
@@ -497,50 +517,49 @@ pub const RequestCookie = struct {
};
pub const Request = struct {
id: ?u64 = null,
method: Method,
url: [:0]const u8,
headers: Headers,
body: ?[]const u8 = null,
cookie_jar: *storage.CookieJar,
_notified_fail: bool = false,
// arbitrary data that can be associated with this request
ctx: *anyopaque = undefined,
start_callback: ?*const fn (req: *Transfer) anyerror!void = null,
header_callback: ?*const fn (req: *Transfer, header: []const u8) anyerror!void = null,
header_done_callback: *const fn (req: *Transfer) anyerror!void,
data_callback: *const fn (req: *Transfer, data: []const u8) anyerror!void,
start_callback: ?*const fn (transfer: *Transfer) anyerror!void = null,
header_callback: ?*const fn (transfer: *Transfer, header: []const u8) anyerror!void = null,
header_done_callback: *const fn (transfer: *Transfer) anyerror!void,
data_callback: *const fn (transfer: *Transfer, data: []const u8) anyerror!void,
done_callback: *const fn (ctx: *anyopaque) anyerror!void,
error_callback: *const fn (ctx: *anyopaque, err: anyerror) void,
};
pub const Transfer = struct {
id: usize,
id: usize = 0,
req: Request,
ctx: *anyopaque,
uri: std.Uri, // used for setting/getting the cookie
ctx: *anyopaque, // copied from req.ctx to make it easier for callback handlers
client: *Client,
_notified_fail: bool = false,
// We'll store the response header here
response_header: ?Header = null,
handle: *Handle,
_handle: ?*Handle = null,
_redirecting: bool = false,
// needs to be freed when we're done
_request_header_list: ?*c.curl_slist = null,
fn deinit(self: *Transfer) void {
if (self._request_header_list) |list| {
c.curl_slist_free_all(list);
self.req.headers.deinit();
if (self._handle) |handle| {
self.client.handles.release(handle);
}
self.client.transfer_pool.destroy(self);
}
pub fn format(self: *const Transfer, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
const req = self.req;
return writer.print("[{d}] {s} {s}", .{ self.id, @tagName(req.method), req.url });
return writer.print("{s} {s}", .{ @tagName(req.method), req.url });
}
pub fn setBody(self: *Transfer, body: []const u8) !void {
@@ -553,8 +572,20 @@ pub const Transfer = struct {
self._request_header_list = c.curl_slist_append(self._request_header_list, value);
}
pub fn updateURL(self: *Transfer, url: [:0]const u8) !void {
// for cookies
self.uri = try std.Uri.parse(url);
// for the request itself
self.req.url = url;
}
pub fn abort(self: *Transfer) void {
self.handle.client.endTransfer(self);
self.client.requestFailed(self, error.Abort);
if (self._handle != null) {
self.client.endTransfer(self);
}
self.deinit();
}
fn headerCallback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) callconv(.c) usize {