Merge pull request #1290 from lightpanda-io/zigdom_request_interception

Zigdom request interception
This commit is contained in:
Karl Seguin
2025-12-25 00:40:48 +08:00
committed by GitHub
8 changed files with 227 additions and 84 deletions

View File

@@ -123,17 +123,23 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
defer client.deinit();
var http = &self.app.http;
http.monitorSocket(socket);
defer http.unmonitorSocket();
http.addCDPClient(.{
.socket = socket,
.ctx = client,
.blocking_read_start = Client.blockingReadStart,
.blocking_read = Client.blockingRead,
.blocking_read_end = Client.blockingReadStop,
});
defer http.removeCDPClient();
std.debug.assert(client.mode == .http);
while (true) {
if (http.poll(timeout_ms) != .extra_socket) {
if (http.poll(timeout_ms) != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (try client.readSocket() == false) {
if (client.readSocket() == false) {
return;
}
@@ -147,19 +153,19 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
var ms_remaining = timeout_ms;
while (true) {
switch (cdp.pageWait(ms_remaining)) {
.extra_socket => {
if (try client.readSocket() == false) {
.cdp_socket => {
if (client.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
ms_remaining = timeout_ms;
},
.no_page => {
if (http.poll(ms_remaining) != .extra_socket) {
if (http.poll(ms_remaining) != .cdp_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (try client.readSocket() == false) {
if (client.readSocket() == false) {
return;
}
last_message = timestamp(.monotonic);
@@ -229,7 +235,30 @@ pub const Client = struct {
self.send_arena.deinit();
}
fn readSocket(self: *Client) !bool {
fn blockingReadStart(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| {
log.warn(.app, "CDP blockingReadStart", .{ .err = err });
return false;
};
return true;
}
fn blockingRead(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
return self.readSocket();
}
fn blockingReadStop(ctx: *anyopaque) bool {
const self: *Client = @ptrCast(@alignCast(ctx));
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.warn(.app, "CDP blockingReadStop", .{ .err = err });
return false;
};
return true;
}
fn readSocket(self: *Client) bool {
const n = posix.read(self.socket, self.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;

View File

@@ -743,7 +743,7 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult {
var scheduler = &self.scheduler;
var http_client = self._session.browser.http_client;
// I'd like the page to know NOTHING about extra_socket / CDP, but the
// I'd like the page to know NOTHING about cdp_socket / CDP, but the
// fact is that the behavior of wait changes depending on whether or
// not we're using CDP.
// If we aren't using CDP, as soon as we think there's nothing left
@@ -753,7 +753,7 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult {
// we could let CDP poll http (like it does for HTTP requests), the fact
// is that we know more about the timing of stuff (e.g. how long to
// poll/sleep) in the page.
const exit_when_done = http_client.extra_socket == null;
const exit_when_done = http_client.cdp_client == null;
// for debugging
// defer self.printWaitAnalysis();
@@ -770,15 +770,15 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult {
// Either we have active http connections, or we're in CDP
// mode with an extra socket. Either way, we're waiting
// for http traffic
if (try http_client.tick(@intCast(ms_remaining)) == .extra_socket) {
if (try http_client.tick(@intCast(ms_remaining)) == .cdp_socket) {
// exit_when_done is explicitly set when there isn't
// an extra socket, so it should not be possibl to
// get an extra_socket message when exit_when_done
// get an cdp_socket message when exit_when_done
// is true.
std.debug.assert(exit_when_done == false);
// data on a socket we aren't handling, return to caller
return .extra_socket;
return .cdp_socket;
}
},
.html, .complete => {
@@ -846,13 +846,13 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult {
} else {
// We're here because we either have active HTTP
// connections, or exit_when_done == false (aka, there's
// an extra_socket registered with the http client).
// an cdp_socket registered with the http client).
// We should continue to run lowPriority tasks, so we
// minimize how long we'll poll for network I/O.
const ms_to_wait = @min(200, @min(ms_remaining, ms_to_next_task orelse 200));
if (try http_client.tick(ms_to_wait) == .extra_socket) {
if (try http_client.tick(ms_to_wait) == .cdp_socket) {
// data on a socket we aren't handling, return to caller
return .extra_socket;
return .cdp_socket;
}
}
},

View File

@@ -244,6 +244,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e
},
};
const is_blocking = script.mode == .normal;
if (remote_url) |url| {
errdefer script.deinit(true);
@@ -255,6 +256,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e
.ctx = script,
.method = .GET,
.headers = headers,
.blocking = is_blocking,
.cookie_jar = &page._session.cookie_jar,
.resource_type = .script,
.start_callback = if (log.enabled(.http, .debug)) Script.startCallback else null,
@@ -274,7 +276,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e
}
}
if (script.mode != .normal) {
if (is_blocking == false) {
const list = self.scriptList(script);
list.append(&script.node);
return;

View File

@@ -137,7 +137,7 @@ pub fn currentPage(self: *Session) ?*Page {
pub const WaitResult = enum {
done,
no_page,
extra_socket,
cdp_socket,
navigate,
};

View File

@@ -89,7 +89,7 @@ pub fn init(page: *Page) !*XMLHttpRequest {
pub fn deinit(self: *XMLHttpRequest) void {
if (self.transfer) |transfer| {
transfer.abort();
transfer.abort(error.Abort);
self.transfer = null;
}
}
@@ -115,7 +115,7 @@ pub fn setOnReadyStateChange(self: *XMLHttpRequest, cb_: ?js.Function) !void {
pub fn open(self: *XMLHttpRequest, method_: []const u8, url: [:0]const u8) !void {
// Abort any in-progress request
if (self._transfer) |transfer| {
transfer.abort();
transfer.abort(error.Abort);
self._transfer = null;
}
@@ -373,7 +373,7 @@ fn httpErrorCallback(ctx: *anyopaque, err: anyerror) void {
pub fn abort(self: *XMLHttpRequest) void {
self.handleError(error.Abort);
if (self._transfer) |transfer| {
transfer.abort();
transfer.abort(error.Abort);
self._transfer = null;
}
}

View File

@@ -408,7 +408,7 @@ pub fn BrowserContext(comptime CDP_T: type) type {
// abort all intercepted requests before closing the sesion/page
// since some of these might callback into the page/scriptmanager
for (self.intercept_state.pendingTransfers()) |transfer| {
transfer.abort();
transfer.abort(error.ClientDisconnect);
}
// If the session has a page, we need to clear it first. The page

View File

@@ -34,6 +34,8 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const errorCheck = Http.errorCheck;
const errorMCheck = Http.errorMCheck;
const IS_DEBUG = builtin.mode == .Debug;
const Method = Http.Method;
// This is loosely tied to a browser Page. Loading all the <scripts>, doing
@@ -100,10 +102,25 @@ use_proxy: bool,
// The complete user-agent header line
user_agent: [:0]const u8,
// libcurl can monitor arbitrary sockets. Currently, we ever [maybe] want to
// monitor the CDP client socket, so we've done the simplest thing possible
// by having this single optional field
extra_socket: ?posix.socket_t = null,
cdp_client: ?CDPClient = null,
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
// both HTTP data as well as messages from an CDP connection.
// Furthermore, we have some tension between blocking scripts and request
// interception. For non-blocking scripts, because nothing blocks, we can
// just queue the scripts until we receive a response to the interception
// notification. But for blocking scripts (which block the parser), it's hard
// to return control back to the CDP loop. So the `read` function pointer is
// used by the Client to have the CDP client read more data from the socket,
// specifically when we're waiting for a request interception response to
// a blocking script.
pub const CDPClient = struct {
socket: posix.socket_t,
ctx: *anyopaque,
blocking_read_start: *const fn (*anyopaque) bool,
blocking_read: *const fn (*anyopaque) bool,
blocking_read_end: *const fn (*anyopaque) bool,
};
const TransferQueue = std.DoublyLinkedList;
@@ -175,7 +192,7 @@ pub fn abort(self: *Client) void {
// We can remove some (all?) of these once we're confident its right.
std.debug.assert(self.handles.in_use.first == null);
std.debug.assert(self.handles.available.len() == self.handles.handles.len);
if (builtin.mode == .Debug) {
if (comptime IS_DEBUG) {
var running: c_int = undefined;
std.debug.assert(c.curl_multi_perform(self.multi, &running) == c.CURLE_OK);
std.debug.assert(running == 0);
@@ -200,23 +217,75 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
pub fn request(self: *Client, req: Request) !void {
const transfer = try self.makeTransfer(req);
if (self.notification) |notification| {
notification.dispatch(.http_request_start, &.{ .transfer = transfer });
const notification = self.notification orelse return self.process(transfer);
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception) {
self.intercepted += 1;
log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted });
if (builtin.mode == .Debug) {
transfer._intercepted = true;
}
// The user is send an invitation to intercept this request.
return;
}
notification.dispatch(.http_request_start, &.{ .transfer = transfer });
var wait_for_interception = false;
notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception == false) {
// request not intercepted, process it normally
return self.process(transfer);
}
return self.process(transfer);
self.intercepted += 1;
if (comptime IS_DEBUG) {
log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted });
}
transfer._intercept_state = .pending;
if (req.blocking == false) {
// The request was interecepted, but it isn't a blocking request, so we
// dont' need to block this call. The request will be unblocked
// asynchronously via eitehr continueTransfer or abortTransfer
return;
}
if (try self.waitForInterceptedResponse(transfer)) {
return self.process(transfer);
}
}
fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool {
// The request was intercepted and is blocking. This is messy, but our
// callers, the ScriptManager -> Page, don't have a great way to stop the
// parser and return control to the CDP server to wait for the interception
// response. We have some information on the CDPClient, so we'll do the
// blocking here. (This is a bit of a legacy thing. Initially the Client
// had a 'extra_socket' that it could monitor. It was named 'extra_socket'
// to appear generic, but really, that 'extra_socket' was always the CDP
// socket. Because we already had the "extra_socket" here, it was easier to
// make it even more CDP- aware and turn `extra_socket: socket_t` into the
// current CDPClient and do the blocking here).
const cdp_client = self.cdp_client.?;
const ctx = cdp_client.ctx;
if (cdp_client.blocking_read_start(ctx) == false) {
return error.BlockingInterceptFailure;
}
defer _ = cdp_client.blocking_read_end(ctx);
while (true) {
if (cdp_client.blocking_read(ctx) == false) {
return error.BlockingInterceptFailure;
}
switch (transfer._intercept_state) {
.pending => continue, // keep waiting
.@"continue" => return true,
.abort => |err| {
transfer.abort(err);
return false;
},
.fulfilled => {
// callbacks already called, just need to cleanups
transfer.deinit();
return false;
},
.not_intercepted => unreachable,
}
}
}
// Above, request will not process if there's an interception request. In such
@@ -232,32 +301,45 @@ fn process(self: *Client, transfer: *Transfer) !void {
// For an intercepted request
pub fn continueTransfer(self: *Client, transfer: *Transfer) !void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted });
return self.process(transfer);
if (!transfer.req.blocking) {
return self.process(transfer);
}
transfer._intercept_state = .@"continue";
}
// For an intercepted request
pub fn abortTransfer(self: *Client, transfer: *Transfer) void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted });
transfer.abort();
if (!transfer.req.blocking) {
transfer.abort(error.Abort);
}
transfer._intercept_state = .{ .abort = error.Abort };
}
// For an intercepted request
pub fn fulfillTransfer(self: *Client, transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void {
if (builtin.mode == .Debug) {
std.debug.assert(transfer._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(transfer._intercept_state != .not_intercepted);
log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted });
}
self.intercepted -= 1;
log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted });
return transfer.fulfill(status, headers, body);
try transfer.fulfill(status, headers, body);
if (!transfer.req.blocking) {
transfer.deinit();
}
transfer._intercept_state = .fulfilled;
}
pub fn nextReqId(self: *Client) usize {
@@ -428,7 +510,7 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
}
pub const PerformStatus = enum {
extra_socket,
cdp_socket,
normal,
};
@@ -444,16 +526,16 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
}
var status = PerformStatus.normal;
if (self.extra_socket) |s| {
if (self.cdp_client) |cdp_client| {
var wait_fd = c.curl_waitfd{
.fd = s,
.fd = cdp_client.socket,
.events = c.CURL_WAIT_POLLIN,
.revents = 0,
};
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller
status = .extra_socket;
status = .cdp_socket;
}
} else if (running > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
@@ -476,22 +558,27 @@ fn processMessages(self: *Client) !bool {
const transfer = try Transfer.fromEasy(easy);
// In case of auth challenge
if (transfer._auth_challenge != null and transfer._tries < 10) { // TODO give a way to configure the number of auth retries.
// TODO give a way to configure the number of auth retries.
if (transfer._auth_challenge != null and transfer._tries < 10) {
if (transfer.client.notification) |notification| {
var wait_for_interception = false;
notification.dispatch(.http_request_auth_required, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception });
if (wait_for_interception) {
// the request is put on hold to be intercepted.
// In this case we ignore callbacks for now.
// Note: we don't deinit transfer on purpose: we want to keep
// using it for the following request.
self.intercepted += 1;
log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted });
if (builtin.mode == .Debug) {
transfer._intercepted = true;
if (comptime IS_DEBUG) {
log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted });
}
self.endTransfer(transfer);
continue;
transfer._intercept_state = .pending;
if (!transfer.req.blocking) {
// the request is put on hold to be intercepted.
// In this case we ignore callbacks for now.
// Note: we don't deinit transfer on purpose: we want to keep
// using it for the following request
self.endTransfer(transfer);
continue;
}
_ = try self.waitForInterceptedResponse(transfer);
}
}
}
@@ -668,6 +755,13 @@ pub const Request = struct {
resource_type: ResourceType,
credentials: ?[:0]const u8 = null,
// This is only relevant for intercepted requests. If a request is flagged
// as blocking AND is intercepted, then it'll be up to us to wait until
// we receive a response to the interception. This probably isn't ideal,
// but it's harder for our caller (ScriptManager) to deal with this. One
// reason for that is the Http Client is already a bit CDP-aware.
blocking: bool = false,
// arbitrary data that can be associated with this request
ctx: *anyopaque = undefined,
@@ -768,7 +862,15 @@ pub const Transfer = struct {
// for when a Transfer is queued in the client.queue
_node: std.DoublyLinkedList.Node = .{},
_intercepted: if (builtin.mode == .Debug) bool else void = if (builtin.mode == .Debug) false else {},
_intercept_state: InterceptState = .not_intercepted,
const InterceptState = union(enum) {
not_intercepted,
pending,
@"continue",
abort: anyerror,
fulfilled,
};
pub fn reset(self: *Transfer) void {
self._redirecting = false;
@@ -858,8 +960,8 @@ pub const Transfer = struct {
self.req.headers = new_headers;
}
pub fn abort(self: *Transfer) void {
self.client.requestFailed(self, error.Abort);
pub fn abort(self: *Transfer, err: anyerror) void {
self.client.requestFailed(self, err);
if (self._handle != null) {
self.client.endTransfer(self);
}
@@ -879,13 +981,16 @@ pub const Transfer = struct {
// abort. We don't call self.client.endTransfer here b/c it has been done
// before interception process.
pub fn abortAuthChallenge(self: *Transfer) void {
if (builtin.mode == .Debug) {
std.debug.assert(self._intercepted);
if (comptime IS_DEBUG) {
std.debug.assert(self._intercept_state != .not_intercepted);
log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted });
}
self.client.intercepted -= 1;
log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted });
self.client.requestFailed(self, error.AbortAuthChallenge);
self.deinit();
if (!self.req.blocking) {
self.abort(error.AbortAuthChallenge);
return;
}
self._intercept_state = .{ .abort = error.AbortAuthChallenge };
}
// redirectionCookies manages cookies during redirections handled by Curl.
@@ -990,7 +1095,9 @@ pub const Transfer = struct {
if (std.mem.startsWith(u8, header, "HTTP/")) {
// Is it the first header line.
if (buf_len < 13) {
log.debug(.http, "invalid response line", .{ .line = header });
if (comptime IS_DEBUG) {
log.debug(.http, "invalid response line", .{ .line = header });
}
return 0;
}
const version_start: usize = if (header[5] == '2') 7 else 9;
@@ -1001,7 +1108,9 @@ pub const Transfer = struct {
std.debug.assert(version_end < 13);
const status = std.fmt.parseInt(u16, header[version_start..version_end], 10) catch {
log.debug(.http, "invalid status code", .{ .line = header });
if (comptime IS_DEBUG) {
log.debug(.http, "invalid status code", .{ .line = header });
}
return 0;
};
@@ -1058,7 +1167,9 @@ pub const Transfer = struct {
if (transfer._redirecting) {
// parse and set cookies for the redirection.
redirectionCookies(transfer, easy) catch |err| {
log.debug(.http, "redirection cookies", .{ .err = err });
if (comptime IS_DEBUG) {
log.debug(.http, "redirection cookies", .{ .err = err });
}
return 0;
};
return buf_len;
@@ -1128,7 +1239,7 @@ pub const Transfer = struct {
pub fn fulfill(transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void {
if (transfer._handle != null) {
// should never happen, should have been intercepted/paused, and then
// either continued, aborted and fulfilled once.
// either continued, aborted or fulfilled once.
@branchHint(.unlikely);
return error.RequestInProgress;
}

View File

@@ -90,12 +90,13 @@ pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus {
};
}
pub fn monitorSocket(self: *Http, socket: posix.socket_t) void {
self.client.extra_socket = socket;
pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void {
std.debug.assert(self.client.cdp_client == null);
self.client.cdp_client = cdp_client;
}
pub fn unmonitorSocket(self: *Http) void {
self.client.extra_socket = null;
pub fn removeCDPClient(self: *Http) void {
self.client.cdp_client = null;
}
pub fn newConnection(self: *Http) !Connection {