From 01223601f287797e0620700597bed34bce487a80 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Fri, 15 Aug 2025 14:01:57 +0800 Subject: [PATCH] Reduce allocations made during request interception Stream (to json) the Transfer as a request and response object in the various network interception-related events (e.g. Network.responseReceived). Add a page.request_intercepted boolean flag for CDP to signal the page that requests have been intercepted, allowing Page.wait to prioritize intercept handling (or, at least, not block it). --- src/browser/page.zig | 54 ++++++++-- src/cdp/cdp.zig | 2 +- src/cdp/domains/fetch.zig | 114 ++++++++++++-------- src/cdp/domains/network.zig | 203 ++++++++++++++++++++++++------------ src/cdp/domains/target.zig | 4 +- src/http/Http.zig | 58 +++++------ 6 files changed, 280 insertions(+), 155 deletions(-) diff --git a/src/browser/page.zig b/src/browser/page.zig index a02a447b..fa784f84 100644 --- a/src/browser/page.zig +++ b/src/browser/page.zig @@ -94,6 +94,16 @@ pub const Page = struct { load_state: LoadState = .parsing, + // Page.wait balances waiting for resources / tasks and producing an output. + // Up until a timeout, Page.wait will always wait for inflight or pending + // HTTP requests, via the Http.Client.active counter. However, intercepted + // requests (via CDP, but it could be anything), aren't considered "active" + // connection. So it's possible that we have intercepted requests (which are + // pending on some driver to continue/abort) while Http.Client.active == 0. + // This boolean exists to supplment Http.Client.active and inform Page.wait + // of pending connections. + request_intercepted: bool = false, + const Mode = union(enum) { pre: void, err: anyerror, @@ -275,16 +285,26 @@ pub const Page = struct { while (true) { SW: switch (self.mode) { .pre, .raw => { + if (self.request_intercepted) { + // the page request was intercepted. + + // there shouldn't be any active requests; + std.debug.assert(http_client.active == 0); + + // nothing we can do for this, need to kick the can up + // the chain and wait for activity (e.g. a CDP message) + // to unblock this. + return; + } + // The main page hasn't started/finished navigating. // There's no JS to run, and no reason to run the scheduler. - if (http_client.active == 0) { // haven't started navigating, I guess. return; } // There should only be 1 active http transfer, the main page - std.debug.assert(http_client.active == 1); try http_client.tick(ms_remaining); }, .html, .parsed => { @@ -330,17 +350,29 @@ pub const Page = struct { _ = try scheduler.runLowPriority(); - // We'll block here, waiting for network IO. We know - // when the next timeout is scheduled, and we know how long - // the caller wants to wait for, so we can pick a good wait - // duration - const ms_to_wait = @min(ms_remaining, ms_to_next_task orelse 1000); + const request_intercepted = self.request_intercepted; + + // We want to prioritize processing intercepted requests + // because, the sooner they get unblocked, the sooner we + // can start the HTTP request. But we still want to advanced + // existing HTTP requests, if possible. So, if we have + // intercepted requests, we'll still look at existing HTTP + // requests, but we won't block waiting for more data. + const ms_to_wait = + if (request_intercepted) 0 + + // But if we have no intercepted requests, we'll wait + // for as long as we can for data to our existing + // inflight requests + else @min(ms_remaining, ms_to_next_task orelse 1000); + try http_client.tick(ms_to_wait); - if (try_catch.hasCaught()) { - const msg = (try try_catch.err(self.arena)) orelse "unknown"; - log.warn(.user_script, "page wait", .{ .err = msg, .src = "data" }); - return error.JsError; + if (request_intercepted) { + // Again, proritizing intercepted requests. Exit this + // loop so that our caller can hopefully resolve them + // (i.e. continue or abort them); + return; } }, .err => |err| return err, diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 2435f7fd..23259ecc 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -495,7 +495,7 @@ pub fn BrowserContext(comptime CDP_T: type) type { pub fn onHttpRequestIntercept(ctx: *anyopaque, data: *const Notification.RequestIntercept) !void { const self: *Self = @alignCast(@ptrCast(ctx)); defer self.resetNotificationArena(); - try @import("domains/fetch.zig").requestPaused(self.notification_arena, self, data); + try @import("domains/fetch.zig").requestIntercept(self.notification_arena, self, data); } pub fn onHttpRequestFail(ctx: *anyopaque, data: *const Notification.RequestFail) !void { diff --git a/src/cdp/domains/fetch.zig b/src/cdp/domains/fetch.zig index 6b02ae58..01ebb356 100644 --- a/src/cdp/domains/fetch.zig +++ b/src/cdp/domains/fetch.zig @@ -18,10 +18,13 @@ const std = @import("std"); const Allocator = std.mem.Allocator; -const Notification = @import("../../notification.zig").Notification; + const log = @import("../../log.zig"); +const network = @import("network.zig"); + const Method = @import("../../http/Client.zig").Method; const Transfer = @import("../../http/Client.zig").Transfer; +const Notification = @import("../../notification.zig").Notification; pub fn processMessage(cmd: anytype) !void { const action = std.meta.stringToEnum(enum { @@ -41,22 +44,38 @@ pub fn processMessage(cmd: anytype) !void { // Stored in CDP pub const InterceptState = struct { - const Self = @This(); - waiting: std.AutoArrayHashMap(u64, *Transfer), + allocator: Allocator, + waiting: std.AutoArrayHashMapUnmanaged(u64, *Transfer), pub fn init(allocator: Allocator) !InterceptState { return .{ - .waiting = std.AutoArrayHashMap(u64, *Transfer).init(allocator), + .waiting = .empty, + .allocator = allocator, }; } - pub fn deinit(self: *Self) void { - self.waiting.deinit(); + pub fn empty(self: *const InterceptState) bool { + return self.waiting.count() == 0; + } + + pub fn put(self: *InterceptState, transfer: *Transfer) !void { + return self.waiting.put(self.allocator, transfer.id, transfer); + } + + pub fn remove(self: *InterceptState, id: u64) ?*Transfer { + const entry = self.waiting.fetchSwapRemove(id) orelse return null; + return entry.value; + } + + pub fn deinit(self: *InterceptState) void { + self.waiting.deinit(self.allocator); } }; const RequestPattern = struct { - urlPattern: []const u8 = "*", // Wildcards ('*' -> zero or more, '?' -> exactly one) are allowed. Escape character is backslash. Omitting is equivalent to "*". + // Wildcards ('*' -> zero or more, '?' -> exactly one) are allowed. + // Escape character is backslash. Omitting is equivalent to "*". + urlPattern: []const u8 = "*", resourceType: ?ResourceType = null, requestStage: RequestStage = .Request, }; @@ -115,8 +134,12 @@ fn disable(cmd: anytype) !void { fn enable(cmd: anytype) !void { const params = (try cmd.params(EnableParam)) orelse EnableParam{}; - if (params.patterns.len != 0) log.warn(.cdp, "Fetch.enable No patterns yet", .{}); - if (params.handleAuthRequests) log.warn(.cdp, "Fetch.enable No auth yet", .{}); + if (params.patterns.len != 0) { + log.warn(.cdp, "not implemented", .{ .feature = "Fetch.enable No patterns yet" }); + } + if (params.handleAuthRequests) { + log.warn(.cdp, "not implemented", .{ .feature = "Fetch.enable No auth yet" }); + } const bc = cmd.browser_context orelse return error.BrowserContextNotLoaded; try bc.fetchEnable(); @@ -124,51 +147,33 @@ fn enable(cmd: anytype) !void { return cmd.sendResult(null, .{}); } -pub fn requestPaused(arena: Allocator, bc: anytype, intercept: *const Notification.RequestIntercept) !void { +pub fn requestIntercept(arena: Allocator, bc: anytype, intercept: *const Notification.RequestIntercept) !void { var cdp = bc.cdp; // unreachable because we _have_ to have a page. const session_id = bc.session_id orelse unreachable; const target_id = bc.target_id orelse unreachable; + const page = bc.session.currentPage() orelse unreachable; // We keep it around to wait for modifications to the request. // NOTE: we assume whomever created the request created it with a lifetime of the Page. // TODO: What to do when receiving replies for a previous page's requests? const transfer = intercept.transfer; - try cdp.intercept_state.waiting.put(transfer.id, transfer); - - // NOTE: .request data preparation is duped from network.zig - const full_request_url = transfer.uri; - const request_url = try @import("network.zig").urlToString(arena, &full_request_url, .{ - .scheme = true, - .authentication = true, - .authority = true, - .path = true, - .query = true, - }); - const request_fragment = try @import("network.zig").urlToString(arena, &full_request_url, .{ - .fragment = true, - }); - const headers = try transfer.req.headers.asHashMap(arena); - // End of duped code + try cdp.intercept_state.put(transfer); try cdp.sendEvent("Fetch.requestPaused", .{ .requestId = try std.fmt.allocPrint(arena, "INTERCEPT-{d}", .{transfer.id}), - .request = .{ - .url = request_url, - .urlFragment = request_fragment, - .method = @tagName(transfer.req.method), - .hasPostData = transfer.req.body != null, - .headers = std.json.ArrayHashMap([]const u8){ .map = headers }, - }, + .request = network.TransferAsRequestWriter.init(transfer), .frameId = target_id, .resourceType = ResourceType.Document, // TODO! .networkId = try std.fmt.allocPrint(arena, "REQ-{d}", .{transfer.id}), }, .{ .session_id = session_id }); // Await either continueRequest, failRequest or fulfillRequest + intercept.wait_for_interception.* = true; + page.request_intercepted = true; } const HeaderEntry = struct { @@ -186,39 +191,58 @@ fn continueRequest(cmd: anytype) !void { headers: ?[]const HeaderEntry = null, interceptResponse: bool = false, })) orelse return error.InvalidParams; - if (params.postData != null or params.headers != null or params.interceptResponse) return error.NotYetImplementedParams; + if (params.postData != null or params.headers != null or params.interceptResponse) { + return error.NotYetImplementedParams; + } + + const page = bc.session.currentPage() orelse return error.PageNotLoaded; + + var intercept_state = &bc.cdp.intercept_state; const request_id = try idFromRequestId(params.requestId); - const entry = bc.cdp.intercept_state.waiting.fetchSwapRemove(request_id) orelse return error.RequestNotFound; - const transfer = entry.value; + const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound; // Update the request with the new parameters if (params.url) |url| { - // The request url must be modified in a way that's not observable by page. So page.url is not updated. - try transfer.updateURL(try bc.cdp.browser.page_arena.allocator().dupeZ(u8, url)); + // The request url must be modified in a way that's not observable by page. + // So page.url is not updated. + try transfer.updateURL(try page.arena.dupeZ(u8, url)); } if (params.method) |method| { transfer.req.method = std.meta.stringToEnum(Method, method) orelse return error.InvalidParams; } - log.info(.cdp, "Request continued by intercept", .{ .id = params.requestId }); + log.info(.cdp, "Request continued by intercept", .{ + .id = params.requestId, + .url = transfer.uri, + }); try bc.cdp.browser.http_client.process(transfer); + if (intercept_state.empty()) { + page.request_intercepted = false; + } + return cmd.sendResult(null, .{}); } fn failRequest(cmd: anytype) !void { const bc = cmd.browser_context orelse return error.BrowserContextNotLoaded; - var state = &bc.cdp.intercept_state; const params = (try cmd.params(struct { requestId: []const u8, // "INTERCEPT-{d}" errorReason: ErrorReason, })) orelse return error.InvalidParams; + const page = bc.session.currentPage() orelse return error.PageNotLoaded; + + var intercept_state = &bc.cdp.intercept_state; const request_id = try idFromRequestId(params.requestId); - const entry = state.waiting.fetchSwapRemove(request_id) orelse return error.RequestNotFound; - // entry.value is the transfer - entry.value.abort(); + + const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound; + transfer.abort(); + + if (intercept_state.empty()) { + page.request_intercepted = false; + } log.info(.cdp, "Request aborted by intercept", .{ .reason = params.errorReason }); return cmd.sendResult(null, .{}); @@ -226,6 +250,8 @@ fn failRequest(cmd: anytype) !void { // Get u64 from requestId which is formatted as: "INTERCEPT-{d}" fn idFromRequestId(request_id: []const u8) !u64 { - if (!std.mem.startsWith(u8, request_id, "INTERCEPT-")) return error.InvalidParams; + if (!std.mem.startsWith(u8, request_id, "INTERCEPT-")) { + return error.InvalidParams; + } return std.fmt.parseInt(u64, request_id[10..], 10) catch return error.InvalidParams; } diff --git a/src/cdp/domains/network.zig b/src/cdp/domains/network.zig index d3f30be2..99e43a6e 100644 --- a/src/cdp/domains/network.zig +++ b/src/cdp/domains/network.zig @@ -19,10 +19,10 @@ const std = @import("std"); const Allocator = std.mem.Allocator; -const Notification = @import("../../notification.zig").Notification; const log = @import("../../log.zig"); const CdpStorage = @import("storage.zig"); const Transfer = @import("../../http/Client.zig").Transfer; +const Notification = @import("../../notification.zig").Notification; pub fn processMessage(cmd: anytype) !void { const action = std.meta.stringToEnum(enum { @@ -239,46 +239,12 @@ pub fn httpRequestStart(arena: Allocator, bc: anytype, data: *const Notification try data.transfer.req.headers.add(extra); } - const document_url = try urlToString(arena, &page.url.uri, .{ - .scheme = true, - .authentication = true, - .authority = true, - .path = true, - .query = true, - }); - const transfer = data.transfer; - const full_request_url = transfer.uri; - const request_url = try urlToString(arena, &full_request_url, .{ - .scheme = true, - .authentication = true, - .authority = true, - .path = true, - .query = true, - }); - const request_fragment = try urlToString(arena, &full_request_url, .{ - .fragment = true, // TODO since path is false, this likely does not work as intended - }); - - const headers = try transfer.req.headers.asHashMap(arena); - // We're missing a bunch of fields, but, for now, this seems like enough - try cdp.sendEvent("Network.requestWillBeSent", .{ - .requestId = try std.fmt.allocPrint(arena, "REQ-{d}", .{transfer.id}), - .frameId = target_id, - .loaderId = bc.loader_id, - .documentUrl = document_url, - .request = .{ - .url = request_url, - .urlFragment = request_fragment, - .method = @tagName(transfer.req.method), - .hasPostData = transfer.req.body != null, - .headers = std.json.ArrayHashMap([]const u8){ .map = headers }, - }, - }, .{ .session_id = session_id }); + try cdp.sendEvent("Network.requestWillBeSent", .{ .requestId = try std.fmt.allocPrint(arena, "REQ-{d}", .{transfer.id}), .frameId = target_id, .loaderId = bc.loader_id, .documentUrl = DocumentUrlWriter.init(&page.url.uri), .request = TransferAsRequestWriter.init(transfer) }, .{ .session_id = session_id }); } -pub fn httpHeadersDone(arena: Allocator, bc: anytype, request: *const Notification.ResponseHeadersDone) !void { +pub fn httpHeadersDone(arena: Allocator, bc: anytype, data: *const Notification.ResponseHeadersDone) !void { // Isn't possible to do a network request within a Browser (which our // notification is tied to), without a page. std.debug.assert(bc.session.page != null); @@ -289,56 +255,159 @@ pub fn httpHeadersDone(arena: Allocator, bc: anytype, request: *const Notificati const session_id = bc.session_id orelse unreachable; const target_id = bc.target_id orelse unreachable; - const url = try urlToString(arena, &request.transfer.uri, .{ - .scheme = true, - .authentication = true, - .authority = true, - .path = true, - .query = true, - }); - - const status = request.transfer.response_header.?.status; - // We're missing a bunch of fields, but, for now, this seems like enough try cdp.sendEvent("Network.responseReceived", .{ - .requestId = try std.fmt.allocPrint(arena, "REQ-{d}", .{request.transfer.id}), + .requestId = try std.fmt.allocPrint(arena, "REQ-{d}", .{data.transfer.id}), .loaderId = bc.loader_id, - .response = .{ - .url = url, - .status = status, - .statusText = @as(std.http.Status, @enumFromInt(status)).phrase() orelse "Unknown", - .headers = ResponseHeaderWriter.init(request.transfer), - }, .frameId = target_id, + .response = TransferAsResponseWriter.init(data.transfer), }, .{ .session_id = session_id }); } -pub fn urlToString(arena: Allocator, url: *const std.Uri, opts: std.Uri.WriteToStreamOptions) ![]const u8 { - var buf: std.ArrayListUnmanaged(u8) = .empty; - try url.writeToStream(opts, buf.writer(arena)); - return buf.items; -} - -const ResponseHeaderWriter = struct { +pub const TransferAsRequestWriter = struct { transfer: *Transfer, - fn init(transfer: *Transfer) ResponseHeaderWriter { + pub fn init(transfer: *Transfer) TransferAsRequestWriter { return .{ .transfer = transfer, }; } - pub fn jsonStringify(self: *const ResponseHeaderWriter, writer: anytype) !void { + pub fn jsonStringify(self: *const TransferAsRequestWriter, writer: anytype) !void { + const stream = writer.stream; + const transfer = self.transfer; + try writer.beginObject(); - var it = self.transfer.responseHeaderIterator(); - while (it.next()) |hdr| { - try writer.objectField(hdr.name); - try writer.write(hdr.value); + { + try writer.objectField("url"); + try writer.beginWriteRaw(); + try stream.writeByte('\"'); + try transfer.uri.writeToStream(.{ + .scheme = true, + .authentication = true, + .authority = true, + .path = true, + .query = true, + }, stream); + try stream.writeByte('\"'); + writer.endWriteRaw(); + } + + { + if (transfer.uri.fragment) |frag| { + try writer.objectField("urlFragment"); + try writer.beginWriteRaw(); + try stream.writeAll("\"#"); + try stream.writeAll(frag.percent_encoded); + try stream.writeByte('\"'); + writer.endWriteRaw(); + } + } + + { + try writer.objectField("method"); + try writer.write(@tagName(transfer.req.method)); + } + + { + try writer.objectField("hasPostData"); + try writer.write(transfer.req.body != null); + } + + { + try writer.objectField("headers"); + try writer.beginObject(); + var it = transfer.req.headers.iterator(); + while (it.next()) |hdr| { + try writer.objectField(hdr.name); + try writer.write(hdr.value); + } + try writer.endObject(); } try writer.endObject(); } }; +const TransferAsResponseWriter = struct { + transfer: *Transfer, + + fn init(transfer: *Transfer) TransferAsResponseWriter { + return .{ + .transfer = transfer, + }; + } + + pub fn jsonStringify(self: *const TransferAsResponseWriter, writer: anytype) !void { + const stream = writer.stream; + const transfer = self.transfer; + + try writer.beginObject(); + { + try writer.objectField("url"); + try writer.beginWriteRaw(); + try stream.writeByte('\"'); + try transfer.uri.writeToStream(.{ + .scheme = true, + .authentication = true, + .authority = true, + .path = true, + .query = true, + }, stream); + try stream.writeByte('\"'); + writer.endWriteRaw(); + } + + if (transfer.response_header) |*rh| { + // it should not be possible for this to be false, but I'm not + // feeling brave today. + const status = rh.status; + try writer.objectField("status"); + try writer.write(status); + + try writer.objectField("statusText"); + try writer.write(@as(std.http.Status, @enumFromInt(status)).phrase() orelse "Unknown"); + } + + { + try writer.objectField("headers"); + try writer.beginObject(); + var it = transfer.responseHeaderIterator(); + while (it.next()) |hdr| { + try writer.objectField(hdr.name); + try writer.write(hdr.value); + } + try writer.endObject(); + } + try writer.endObject(); + } +}; + +const DocumentUrlWriter = struct { + uri: *std.Uri, + + fn init(uri: *std.Uri) DocumentUrlWriter { + return .{ + .uri = uri, + }; + } + + pub fn jsonStringify(self: *const DocumentUrlWriter, writer: anytype) !void { + const stream = writer.stream; + + try writer.beginWriteRaw(); + try stream.writeByte('\"'); + try self.uri.writeToStream(.{ + .scheme = true, + .authentication = true, + .authority = true, + .path = true, + .query = true, + }, stream); + try stream.writeByte('\"'); + writer.endWriteRaw(); + } +}; + const testing = @import("../testing.zig"); test "cdp.network setExtraHTTPHeaders" { var ctx = testing.context(); diff --git a/src/cdp/domains/target.zig b/src/cdp/domains/target.zig index 0a3ed2e0..ced51f74 100644 --- a/src/cdp/domains/target.zig +++ b/src/cdp/domains/target.zig @@ -73,7 +73,9 @@ fn createBrowserContext(cmd: anytype) !void { originsWithUniversalNetworkAccess: ?[]const []const u8 = null, }); if (params) |p| { - if (p.disposeOnDetach or p.proxyBypassList != null or p.originsWithUniversalNetworkAccess != null) std.debug.print("Target.createBrowserContext: Not implemented param set\n", .{}); + if (p.disposeOnDetach or p.proxyBypassList != null or p.originsWithUniversalNetworkAccess != null) { + log.warn(.cdp, "not implemented", .{ .feature = "Target.createBrowserContext: Not implemented param set" }); + } } const bc = cmd.createBrowserContext() catch |err| switch (err) { diff --git a/src/http/Http.zig b/src/http/Http.zig index 1d8cde58..85187d6b 100644 --- a/src/http/Http.zig +++ b/src/http/Http.zig @@ -225,6 +225,11 @@ pub const Headers = struct { headers: *c.curl_slist, cookies: ?[*c]const u8, + const Header = struct { + name: []const u8, + value: []const u8, + }; + pub fn init() !Headers { const header_list = c.curl_slist_append(null, "User-Agent: Lightpanda/1.0"); if (header_list == null) return error.OutOfMemory; @@ -242,25 +247,7 @@ pub const Headers = struct { self.headers = updated_headers; } - pub fn asHashMap(self: *const Headers, allocator: Allocator) !std.StringArrayHashMapUnmanaged([]const u8) { - var list: std.StringArrayHashMapUnmanaged([]const u8) = .empty; - try list.ensureTotalCapacity(allocator, self.count()); - - var current: [*c]c.curl_slist = self.headers; - while (current) |node| { - const str = std.mem.span(@as([*:0]const u8, @ptrCast(node.*.data))); - const header = parseHeader(str) orelse return error.InvalidHeader; - list.putAssumeCapacity(header.name, header.value); - current = node.*.next; - } - // special case for cookies - if (self.cookies) |v| { - list.putAssumeCapacity("Cookie", std.mem.span(@as([*:0]const u8, @ptrCast(v)))); - } - return list; - } - - pub fn parseHeader(header_str: []const u8) ?std.http.Header { + pub fn parseHeader(header_str: []const u8) ?Header { const colon_pos = std.mem.indexOfScalar(u8, header_str, ':') orelse return null; const name = std.mem.trim(u8, header_str[0..colon_pos], " \t"); @@ -269,19 +256,28 @@ pub const Headers = struct { return .{ .name = name, .value = value }; } - pub fn count(self: *const Headers) usize { - var current: [*c]c.curl_slist = self.headers; - var num: usize = 0; - while (current) |node| { - num += 1; - current = node.*.next; - } - // special case for cookies - if (self.cookies != null) { - num += 1; - } - return num; + pub fn iterator(self: *Headers) Iterator { + return .{ + .header = self.headers, + .cookies = self.cookies, + }; } + + const Iterator = struct { + header: [*c]c.curl_slist, + cookies: ?[*c]const u8, + + pub fn next(self: *Iterator) ?Header { + const h = self.header orelse { + const cookies = self.cookies orelse return null; + self.cookies = null; + return .{ .name = "Cookie", .value = std.mem.span(@as([*:0]const u8, cookies)) }; + }; + + self.header = h.*.next; + return parseHeader(std.mem.span(@as([*:0]const u8, @ptrCast(h.*.data)))); + } + }; }; pub fn errorCheck(code: c.CURLcode) errors.Error!void {