Rework request interception for Zigdom

Zigdom broke request interception. It isn't zigdom specifically, but in zigdom
we properly block the parser when executing a normal (not async, not defer)
script. This does not work well with request interception, because an
intercepted request isn't blocked on HTTP data, it's blocked on a message from
CDP. Generally, neither our Page nor ScriptManager are CDP-aware. And, even if
they were, it would be hard to break out of our parsing and return control to
the CDP server.

To fix this, we expand on the HTTP Client's basic awareness of CDP (via its
extra_socket field). The HTTP client is now able to block until an intercepted
request is continued/aborted/fulfilled. it does this by being able to ask the
CDP client to read/process data.

This does not yet work for intercepted authentication requests.
This commit is contained in:
Karl Seguin
2025-12-23 19:06:26 +08:00
parent 83f008de1f
commit 67875036c5
6 changed files with 201 additions and 69 deletions

View File

@@ -34,6 +34,8 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const errorCheck = Http.errorCheck;
const errorMCheck = Http.errorMCheck;
const IS_DEBUG = builtin.mode == .Debug;
const Method = Http.Method;
// This is loosely tied to a browser Page. Loading all the <scripts>, doing
@@ -100,10 +102,25 @@ use_proxy: bool,
// The complete user-agent header line
user_agent: [:0]const u8,
// libcurl can monitor arbitrary sockets. Currently, we ever [maybe] want to
// monitor the CDP client socket, so we've done the simplest thing possible
// by having this single optional field
extra_socket: ?posix.socket_t = null,
cdp_client: ?CDPClient = null,
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
// both HTTP data as well as messages from an CDP connection.
// Furthermore, we have some tension between blocking scripts and request
// interception. For non-blocking scripts, because nothing blocks, we can
// just queue the scripts until we receive a response to the interception
// notification. But for blocking scripts (which block the parser), it's hard
// to return control back to the CDP loop. So the `read` function pointer is
// used by the Client to have the CDP client read more data from the socket,
// specifically when we're waiting for a request interception response to
// a blocking script.
pub const CDPClient = struct {
socket: posix.socket_t,
ctx: *anyopaque,
blocking_read_start: *const fn (*anyopaque) bool,
blocking_read: *const fn (*anyopaque) bool,
blocking_read_end: *const fn (*anyopaque) bool,
};
const TransferQueue = std.DoublyLinkedList;
@@ -175,7 +192,7 @@ pub fn abort(self: *Client) void {
// We can remove some (all?) of these once we're confident its right.
std.debug.assert(self.handles.in_use.first == null);
std.debug.assert(self.handles.available.len() == self.handles.handles.len);
if (builtin.mode == .Debug) {
if (comptime IS_DEBUG) {
var running: c_int = undefined;
std.debug.assert(c.curl_multi_perform(self.multi, &running) == c.CURLE_OK);
std.debug.assert(running == 0);
@@ -200,23 +217,71 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
pub fn request(self: *Client, req: Request) !void {
const transfer = try self.makeTransfer(req);
if (self.notification) |notification| {
notification.dispatch(.http_request_start, &.{ .transfer = transfer });
const notification = self.notification orelse return self.process(transfer);
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception) {
self.intercepted += 1;
log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted });
if (builtin.mode == .Debug) {
transfer._intercepted = true;
}
// The user is send an invitation to intercept this request.
return;
notification.dispatch(.http_request_start, &.{ .transfer = transfer });
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception == false) {
// request not intercepted, process it normally
return self.process(transfer);
}
self.intercepted += 1;
if (comptime IS_DEBUG) {
log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted });
}
transfer._intercept_state = .pending;
if (req.blocking == false) {
// The request was interecepted, but it isn't a blocking request, so we
// dont' need to block this call. The request will be unblocked
// asynchronously via eitehr continueTransfer or abortTransfer
return;
}
// The request was intercepted and is blocking. This is messy, but our
// callers, the ScriptManager -> Page, don't have a great way to stop the
// parser and return control to the CDP server to wait for the interception
// response. We have some information on the CDPClient, so we'll do the
// blocking here. (This is a bit of a legacy thing. Initially the Client)
// had a 'extra_socket' that it could monitor. It was named 'extra_socket'
// to appear generic, but really, that 'extra_socket' was always the CDP
// socket that we could monitor in libcurm. Because we already had
// the "extra_socket" here, it was easier just to make it even more CDP-
// aware and turn `extra_socket: socket_t` into the current CDPClient).
const cdp_client = self.cdp_client.?;
const ctx = cdp_client.ctx;
if (cdp_client.blocking_read_start(ctx) == false) {
return error.BlockingInterceptFailure;
}
while (true) {
if (cdp_client.blocking_read(ctx) == false) {
return error.BlockingInterceptFailure;
}
switch (transfer._intercept_state) {
.pending => continue, // keep waiting
.@"continue" => return self.process(transfer),
.abort => {
transfer.abort();
return;
},
.fulfilled => {
// callbacks already called, just need to cleanups
transfer.deinit();
return;
},
.not_intercepted => unreachable,
}
}
return self.process(transfer);
if (cdp_client.blocking_read_end(ctx) == false) {
return error.BlockingInterceptFailure;
}
}
// Above, request will not process if there's an interception request. In such
@@ -232,32 +297,45 @@ fn process(self: *Client, transfer: *Transfer) !void {
// For an intercepted request
pub fn continueTransfer(self: *Client, transfer: *Transfer) !void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted });
return self.process(transfer);
transfer._intercept_state = .@"continue";
if (!transfer.req.blocking) {
return self.process(transfer);
}
}
// For an intercepted request
pub fn abortTransfer(self: *Client, transfer: *Transfer) void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted });
transfer.abort();
transfer._intercept_state = .abort;
if (!transfer.req.blocking) {
transfer.abort();
}
}
// For an intercepted request
pub fn fulfillTransfer(self: *Client, transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted });
return transfer.fulfill(status, headers, body);
transfer._intercept_state = .fulfilled;
try transfer.fulfill(status, headers, body);
if (!transfer.req.blocking) {
transfer.deinit();
}
}
pub fn nextReqId(self: *Client) usize {
@@ -428,7 +506,7 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
}
pub const PerformStatus = enum {
extra_socket,
cdp_socket,
normal,
};
@@ -444,16 +522,16 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
}
var status = PerformStatus.normal;
if (self.extra_socket) |s| {
if (self.cdp_client) |cdp_client| {
var wait_fd = c.curl_waitfd{
.fd = s,
.fd = cdp_client.socket,
.events = c.CURL_WAIT_POLLIN,
.revents = 0,
};
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller
status = .extra_socket;
status = .cdp_socket;
}
} else if (running > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
@@ -476,7 +554,8 @@ fn processMessages(self: *Client) !bool {
const transfer = try Transfer.fromEasy(easy);
// In case of auth challenge
if (transfer._auth_challenge != null and transfer._tries < 10) { // TODO give a way to configure the number of auth retries.
// TODO give a way to configure the number of auth retries.
if (transfer._auth_challenge != null and transfer._tries < 10) {
if (transfer.client.notification) |notification| {
var wait_for_interception = false;
notification.dispatch(.http_request_auth_required, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
@@ -486,10 +565,10 @@ fn processMessages(self: *Client) !bool {
// Note: we don't deinit transfer on purpose: we want to keep
// using it for the following request.
self.intercepted += 1;
log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted });
if (builtin.mode == .Debug) {
transfer._intercepted = true;
if (comptime IS_DEBUG) {
log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted });
}
transfer._intercept_state = .pending;
self.endTransfer(transfer);
continue;
}
@@ -668,6 +747,13 @@ pub const Request = struct {
resource_type: ResourceType,
credentials: ?[:0]const u8 = null,
// This is only relevant for intercepted requests. If a request is flagged
// as blocking AND is interecepted, then it'll be up to us to wait until
// we receive a response to the interception. This probably isn't ideal,
// but it's harder for our caller (ScriptManager) to deal with this. One
// reason for that is the Http Client is already a bit CDP-aware.
blocking: bool = false,
// arbitrary data that can be associated with this request
ctx: *anyopaque = undefined,
@@ -768,7 +854,15 @@ pub const Transfer = struct {
// for when a Transfer is queued in the client.queue
_node: std.DoublyLinkedList.Node = .{},
_intercepted: if (builtin.mode == .Debug) bool else void = if (builtin.mode == .Debug) false else {},
_intercept_state: InterceptState = .not_intercepted,
const InterceptState = enum {
not_intercepted,
pending,
@"continue",
abort,
fulfilled,
};
pub fn reset(self: *Transfer) void {
self._redirecting = false;
@@ -879,11 +973,11 @@ pub const Transfer = struct {
// abort. We don't call self.client.endTransfer here b/c it has been done
// before interception process.
pub fn abortAuthChallenge(self: *Transfer) void {
if (builtin.mode == .Debug) {
std.debug.assert(self._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(self._intercept_state != .not_intercepted);
log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted });
}
self.client.intercepted -= 1;
log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted });
self.client.requestFailed(self, error.AbortAuthChallenge);
self.deinit();
}
@@ -990,7 +1084,9 @@ pub const Transfer = struct {
if (std.mem.startsWith(u8, header, "HTTP/")) {
// Is it the first header line.
if (buf_len < 13) {
log.debug(.http, "invalid response line", .{ .line = header });
if (comptime IS_DEBUG) {
log.debug(.http, "invalid response line", .{ .line = header });
}
return 0;
}
const version_start: usize = if (header[5] == '2') 7 else 9;
@@ -1001,7 +1097,9 @@ pub const Transfer = struct {
std.debug.assert(version_end < 13);
const status = std.fmt.parseInt(u16, header[version_start..version_end], 10) catch {
log.debug(.http, "invalid status code", .{ .line = header });
if (comptime IS_DEBUG) {
log.debug(.http, "invalid status code", .{ .line = header });
}
return 0;
};
@@ -1058,7 +1156,9 @@ pub const Transfer = struct {
if (transfer._redirecting) {
// parse and set cookies for the redirection.
redirectionCookies(transfer, easy) catch |err| {
log.debug(.http, "redirection cookies", .{ .err = err });
if (comptime IS_DEBUG) {
log.debug(.http, "redirection cookies", .{ .err = err });
}
return 0;
};
return buf_len;
@@ -1128,7 +1228,7 @@ pub const Transfer = struct {
pub fn fulfill(transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void {
if (transfer._handle != null) {
// should never happen, should have been intercepted/paused, and then
// either continued, aborted and fulfilled once.
// either continued, aborted or fulfilled once.
@branchHint(.unlikely);
return error.RequestInProgress;
}

View File

@@ -90,12 +90,13 @@ pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus {
};
}
pub fn monitorSocket(self: *Http, socket: posix.socket_t) void {
self.client.extra_socket = socket;
pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void {
std.debug.assert(self.client.cdp_client == null);
self.client.cdp_client = cdp_client;
}
pub fn unmonitorSocket(self: *Http) void {
self.client.extra_socket = null;
pub fn removeCDPClient(self: *Http) void {
self.client.cdp_client = null;
}
pub fn newConnection(self: *Http) !Connection {