diff --git a/src/Server.zig b/src/Server.zig index 6800ddc9..16d438ec 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -260,14 +260,14 @@ pub const Client = struct { fn start(self: *Client) void { const http = self.http; - http.cdp_client = .{ + http.setCdpClient(.{ .socket = self.ws.socket, .ctx = self, .blocking_read_start = Client.blockingReadStart, .blocking_read = Client.blockingRead, .blocking_read_end = Client.blockingReadStop, - }; - defer http.cdp_client = null; + }); + defer http.setCdpClient(null); self.httpLoop(http) catch |err| { log.err(.app, "CDP client loop", .{ .err = err }); diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 6247e323..0e73d219 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -21,20 +21,19 @@ const builtin = @import("builtin"); const posix = std.posix; const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; - const lp = @import("lightpanda"); const log = lp.log; const URL = @import("URL.zig"); const Config = @import("../Config.zig"); const Notification = @import("../Notification.zig"); const CookieJar = @import("webapi/storage/Cookie.zig").Jar; - const http = @import("../network/http.zig"); const Network = @import("../network/Network.zig"); + const Robots = @import("../network/Robots.zig"); const Cache = @import("../network/cache/Cache.zig"); -const CacheMetadata = Cache.CachedMetadata; const CachedResponse = Cache.CachedResponse; +const WebBotAuth = @import("../network/WebBotAuth.zig"); const IS_DEBUG = builtin.mode == .Debug; @@ -43,89 +42,953 @@ pub const Headers = http.Headers; pub const ResponseHead = http.ResponseHead; pub const HeaderIterator = http.HeaderIterator; -// This is loosely tied to a browser Page. Loading all the , doing -// XHR requests, and loading imports all happens through here. Sine the app -// currently supports 1 browser and 1 page at-a-time, we only have 1 Client and -// re-use it from page to page. This allows us better re-use of the various -// buffers/caches (including keepalive connections) that libcurl has. -// -// The app has other secondary http needs, like telemetry. While we want to -// share some things (namely the ca blob, and maybe some configuration -// (TODO: ??? should proxy settings be global ???)), we're able to do call -// client.abort() to abort the transfers being made by a page, without impacting -// those other http requests. -pub const Client = @This(); +pub const CacheLayer = @import("../network/layer/CacheLayer.zig"); -// Count of active requests -active: usize = 0, +pub const PerformStatus = enum { cdp_socket, normal }; -// Count of intercepted requests. This is to help deal with intercepted requests. -// The client doesn't track intercepted transfers. If a request is intercepted, -// the client forgets about it and requires the interceptor to continue or abort -// it. That works well, except if we only rely on active, we might think there's -// no more network activity when, with interecepted requests, there might be more -// in the future. (We really only need this to properly emit a 'networkIdle' and -// 'networkAlmostIdle' Page.lifecycleEvent in CDP). -intercepted: usize = 0, +pub const Transport = struct { + active: usize = 0, -// Our curl multi handle. -handles: http.Handles, + // Count of intercepted requests. This is to help deal with intercepted + // requests. The client doesn't track intercepted transfers. If a request + // is intercepted, the client forgets about it and requires the interceptor + // to continue or abort it. That works well, except if we only rely on + // active, we might think there's no more network activity when, with + // intercepted requests, there might be more in the future. (We really only + // need this to properly emit a 'networkIdle' and 'networkAlmostIdle' + // Page.lifecycleEvent in CDP). + intercepted: usize = 0, -// Connections currently in this client's curl_multi. -in_use: std.DoublyLinkedList = .{}, + handles: http.Handles, + in_use: std.DoublyLinkedList = .{}, + dirty: std.DoublyLinkedList = .{}, + performing: bool = false, + next_request_id: u32 = 0, + queue: std.DoublyLinkedList = .{}, + allocator: Allocator, + network: *Network, + transfer_pool: std.heap.MemoryPool(Transfer), + http_proxy: ?[:0]const u8 = null, + use_proxy: bool, + tls_verify: bool = true, + cdp_client: ?CDPClient = null, -// Connections that failed to be removed from curl_multi during perform. -dirty: std.DoublyLinkedList = .{}, + pub fn init(allocator: Allocator, network: *Network) !*Transport { + var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator); + errdefer transfer_pool.deinit(); -// Whether we're currently inside a curl_multi_perform call. -performing: bool = false, + const t = try allocator.create(Transport); + errdefer allocator.destroy(t); -// Use to generate the next request ID -next_request_id: u32 = 0, + var handles = try http.Handles.init(network.config); + errdefer handles.deinit(); -// When handles has no more available easys, requests get queued. -queue: std.DoublyLinkedList = .{}, + const http_proxy = network.config.httpProxy(); + t.* = .{ + .handles = handles, + .network = network, + .allocator = allocator, + .transfer_pool = transfer_pool, + .use_proxy = http_proxy != null, + .http_proxy = http_proxy, + .tls_verify = network.config.tlsVerifyHost(), + }; + return t; + } -// The main app allocator -allocator: Allocator, + pub fn deinit(self: *Transport) void { + self.abort(); + self.handles.deinit(); + self.transfer_pool.deinit(); + self.allocator.destroy(self); + } -network: *Network, + pub fn layer(self: *Transport) Layer { + return .{ + .ptr = self, + .vtable = &.{ .request = _request }, + }; + } -// Queue of requests that depend on a robots.txt. -// Allows us to fetch the robots.txt just once. -pending_robots_queue: std.StringHashMapUnmanaged(std.ArrayList(Request)) = .empty, + pub fn setTlsVerify(self: *Transport, verify: bool) !void { + var it = self.in_use.first; + while (it) |node| : (it = node.next) { + const conn: *http.Connection = @fieldParentPtr("node", node); + try conn.setTlsVerify(verify, self.use_proxy); + } + self.tls_verify = verify; + } -// Once we have a handle/easy to process a request with, we create a Transfer -// which contains the Request as well as any state we need to process the -// request. These wil come and go with each request. -transfer_pool: std.heap.MemoryPool(Transfer), + pub fn changeProxy(self: *Transport, proxy: ?[:0]const u8) !void { + try self.ensureNoActiveConnection(); + self.http_proxy = proxy orelse self.network.config.httpProxy(); + self.use_proxy = self.http_proxy != null; + } -// The current proxy. CDP can change it, changeProxy(null) restores -// from config. -http_proxy: ?[:0]const u8 = null, + pub fn restoreOriginalProxy(self: *Transport) !void { + try self.ensureNoActiveConnection(); + self.http_proxy = self.network.config.httpProxy(); + self.use_proxy = self.http_proxy != null; + } -// track if the client use a proxy for connections. -// We can't use http_proxy because we want also to track proxy configured via -// CDP. -use_proxy: bool, + pub fn newHeaders(self: *const Transport) !http.Headers { + return http.Headers.init(self.network.config.http_headers.user_agent_header); + } -// Current TLS verification state, applied per-connection in makeRequest. -tls_verify: bool = true, + pub fn abort(self: *Transport) void { + self._abort(true, 0); + } -obey_robots: bool, + pub fn abortFrame(self: *Transport, frame_id: u32) void { + self._abort(false, frame_id); + } -cdp_client: ?CDPClient = null, + fn _abort(self: *Transport, comptime abort_all: bool, frame_id: u32) void { + { + var n = self.in_use.first; + while (n) |node| { + n = node.next; + const conn: *http.Connection = @fieldParentPtr("node", node); + var transfer = Transfer.fromConnection(conn) catch |err| { + self.removeConn(conn); + log.err(.http, "get private info", .{ .err = err, .source = "abort" }); + continue; + }; + if (comptime abort_all) { + transfer.kill(); + } else if (transfer.req.frame_id == frame_id) { + transfer.kill(); + } + } + } + + { + var q = &self.queue; + var n = q.first; + while (n) |node| { + n = node.next; + const transfer: *Transfer = @fieldParentPtr("_node", node); + if (comptime abort_all) { + transfer.kill(); + } else if (transfer.req.frame_id == frame_id) { + q.remove(node); + transfer.kill(); + } + } + } + + if (comptime abort_all) { + self.queue = .{}; + } + + if (comptime IS_DEBUG and abort_all) { + // Even after an abort_all, we could still have transfers, but, at + // the very least, they should all be flagged as aborted. + var it = self.in_use.first; + var leftover: usize = 0; + while (it) |node| : (it = node.next) { + const conn: *http.Connection = @fieldParentPtr("node", node); + std.debug.assert((Transfer.fromConnection(conn) catch unreachable).aborted); + leftover += 1; + } + std.debug.assert(self.active == leftover); + } + } + + pub fn tick(self: *Transport, timeout_ms: u32) !PerformStatus { + while (self.queue.popFirst()) |queue_node| { + const conn = self.network.getConnection() orelse { + self.queue.prepend(queue_node); + break; + }; + try self.makeRequest(conn, @fieldParentPtr("_node", queue_node)); + } + return self.perform(@intCast(timeout_ms)); + } + + /// Core entry point: interception gating then queues/starts the transfer. + /// Robots and cache checks are done by layers above. + pub fn _request(ptr: *anyopaque, _: Context, req: Request) !void { + const self: *Transport = @ptrCast(@alignCast(ptr)); + const transfer = try self.makeTransfer(req); + + transfer.req.notification.dispatch(.http_request_start, &.{ .transfer = transfer }); + + var wait_for_interception = false; + transfer.req.notification.dispatch(.http_request_intercept, &.{ + .transfer = transfer, + .wait_for_interception = &wait_for_interception, + }); + if (wait_for_interception == false) { + return self.process(transfer); + } + + self.intercepted += 1; + if (comptime IS_DEBUG) { + log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted }); + } + transfer._intercept_state = .pending; + + if (req.blocking == false) { + return; + } + if (try self.waitForInterceptedResponse(transfer)) { + return self.process(transfer); + } + } + + fn waitForInterceptedResponse(self: *Transport, transfer: *Transfer) !bool { + const cdp_client = self.cdp_client.?; + const ctx = cdp_client.ctx; + + if (cdp_client.blocking_read_start(ctx) == false) { + return error.BlockingInterceptFailure; + } + defer _ = cdp_client.blocking_read_end(ctx); + + while (true) { + if (cdp_client.blocking_read(ctx) == false) { + return error.BlockingInterceptFailure; + } + switch (transfer._intercept_state) { + .pending => continue, + .@"continue" => return true, + .abort => |err| { + transfer.abort(err); + return false; + }, + .fulfilled => { + transfer.deinit(); + return false; + }, + .not_intercepted => unreachable, + } + } + } + + fn process(self: *Transport, transfer: *Transfer) !void { + if (self.performing == false) { + if (self.network.getConnection()) |conn| { + return self.makeRequest(conn, transfer); + } + } + self.queue.append(&transfer._node); + } + + pub fn continueTransfer(self: *Transport, transfer: *Transfer) !void { + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted }); + } + self.intercepted -= 1; + if (!transfer.req.blocking) { + return self.process(transfer); + } + transfer._intercept_state = .@"continue"; + } + + pub fn abortTransfer(self: *Transport, transfer: *Transfer) void { + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted }); + } + self.intercepted -= 1; + if (!transfer.req.blocking) { + transfer.abort(error.Abort); + } + transfer._intercept_state = .{ .abort = error.Abort }; + } + + pub fn fulfillTransfer(self: *Transport, transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "fulfill transfer", .{ .intercepted = self.intercepted }); + } + self.intercepted -= 1; + try transfer.fulfill(status, headers, body); + if (!transfer.req.blocking) { + transfer.deinit(); + return; + } + transfer._intercept_state = .fulfilled; + } + + pub fn nextReqId(self: *Transport) u32 { + return self.next_request_id +% 1; + } + + pub fn incrReqId(self: *Transport) u32 { + const id = self.next_request_id +% 1; + self.next_request_id = id; + return id; + } + + fn makeTransfer(self: *Transport, req: Request) !*Transfer { + errdefer req.headers.deinit(); + + const transfer = try self.transfer_pool.create(); + errdefer self.transfer_pool.destroy(transfer); + + const id = self.incrReqId(); + transfer.* = .{ + .arena = ArenaAllocator.init(self.allocator), + .id = id, + .url = req.url, + .req = req, + .client = self, + .max_response_size = self.network.config.httpMaxResponseSize(), + }; + return transfer; + } + + fn makeRequest(self: *Transport, conn: *http.Connection, transfer: *Transfer) anyerror!void { + { + const auth = transfer._auth_challenge; + transfer.reset(); + transfer._auth_challenge = auth; + transfer._conn = conn; + errdefer { + transfer._conn = null; + transfer.deinit(); + self.releaseConn(conn); + } + try transfer.configureConn(conn); + } + + self.in_use.append(&conn.node); + self.handles.add(conn) catch |err| { + transfer._conn = null; + transfer.deinit(); + self.in_use.remove(&conn.node); + self.releaseConn(conn); + return err; + }; + self.active += 1; + + if (transfer.req.start_callback) |cb| { + cb(Response.fromTransfer(transfer)) catch |err| { + transfer.deinit(); + return err; + }; + } + _ = try self.perform(0); + } + + fn perform(self: *Transport, timeout_ms: c_int) anyerror!PerformStatus { + const running = blk: { + self.performing = true; + defer self.performing = false; + break :blk try self.handles.perform(); + }; + + while (self.dirty.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + self.handles.remove(conn) catch |err| { + log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); + @panic("multi_remove_handle"); + }; + self.releaseConn(conn); + } + + if (try self.processMessages()) { + return .normal; + } + + var status = PerformStatus.normal; + if (self.cdp_client) |cdp_client| { + var wait_fds = [_]http.WaitFd{.{ + .fd = cdp_client.socket, + .events = .{ .pollin = true }, + .revents = .{}, + }}; + try self.handles.poll(&wait_fds, timeout_ms); + if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { + status = .cdp_socket; + } + } else if (running > 0) { + try self.handles.poll(&.{}, timeout_ms); + } + + _ = try self.processMessages(); + return status; + } + + fn processOneMessage(self: *Transport, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool { + // Detect auth challenge from response headers. + // Also check on RecvError: proxy may send 407 with headers before + // closing the connection (CONNECT tunnel not yet established). + if (msg.err == null or msg.err.? == error.RecvError) { + transfer.detectAuthChallenge(&msg.conn); + } + + if (transfer._auth_challenge != null and transfer._tries < 10) { + var wait_for_interception = false; + transfer.req.notification.dispatch( + .http_request_auth_required, + &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception }, + ); + if (wait_for_interception) { + self.intercepted += 1; + if (comptime IS_DEBUG) { + log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted }); + } + transfer._intercept_state = .pending; + transfer.releaseConn(); + if (!transfer.req.blocking) { + return false; + } + if (try self.waitForInterceptedResponse(transfer)) { + self.queue.append(&transfer._node); + } + return false; + } + } + + // Handle redirects: reuse the same connection to preserve TCP state. + if (msg.err == null) { + const status = try msg.conn.getResponseCode(); + if (status >= 300 and status <= 399) { + try transfer.handleRedirect(); + + const conn = transfer._conn.?; + try self.handles.remove(conn); + transfer._conn = null; + transfer._detached_conn = conn; + + transfer.reset(); + try transfer.configureConn(conn); + try self.handles.add(conn); + transfer._detached_conn = null; + transfer._conn = conn; + + _ = try self.perform(0); + return false; + } + } + + // When the server sends "Connection: close" and closes the TLS + // connection without a close_notify alert, BoringSSL reports + // RecvError. If we already received valid HTTP headers, this is + // a normal end-of-body (the connection closure signals the end of + // the response per HTTP/1.1 when there is no Content-Length). We + // must check this before endTransfer, which may reset the easy handle. + const is_conn_close_recv = blk: { + const err = msg.err orelse break :blk false; + if (err != error.RecvError) break :blk false; + const hdr = msg.conn.getResponseHeader("connection", 0) orelse break :blk false; + break :blk std.ascii.eqlIgnoreCase(hdr.value, "close"); + }; + + // Make sure the transfer can't be immediately aborted from a callback + // since we still need it here. + transfer._performing = true; + defer transfer._performing = false; + + if (msg.err != null and !is_conn_close_recv) { + transfer.requestFailed(transfer._callback_error orelse msg.err.?, true); + return true; + } + + if (!transfer._header_done_called) { + // In case of request w/o data, we need to call the header done + // callback now. + const proceed = try transfer.headerDoneCallback(&msg.conn); + if (!proceed) { + transfer.requestFailed(error.Abort, true); + return true; + } + } + + const body = transfer._stream_buffer.items; + + if (transfer._stream_buffer.items.len > 0) { + try transfer.req.data_callback(Response.fromTransfer(transfer), body); + + transfer.req.notification.dispatch(.http_response_data, &.{ + .data = body, + .transfer = transfer, + }); + + if (transfer.aborted) { + transfer.requestFailed(error.Abort, true); + return true; + } + } + + // Release conn ASAP so that it's available; some done_callbacks + // will load more resources. + transfer.releaseConn(); + + try transfer.req.done_callback(transfer.req.ctx); + + transfer.req.notification.dispatch(.http_request_done, &.{ + .transfer = transfer, + }); + + return true; + } + + fn processMessages(self: *Transport) !bool { + var processed = false; + while (self.handles.readMessage()) |msg| { + const transfer = try Transfer.fromConnection(&msg.conn); + const done = self.processOneMessage(msg, transfer) catch |err| blk: { + log.err(.http, "process_messages", .{ .err = err, .req = transfer }); + transfer.requestFailed(err, true); + if (transfer._detached_conn) |c| { + // Conn was removed from handles during redirect reconfiguration + // but not re-added. Release it directly to avoid double-remove. + self.in_use.remove(&c.node); + self.active -= 1; + self.releaseConn(c); + transfer._detached_conn = null; + } + break :blk true; + }; + if (done) { + transfer.deinit(); + processed = true; + } + } + return processed; + } + + fn removeConn(self: *Transport, conn: *http.Connection) void { + self.in_use.remove(&conn.node); + self.active -= 1; + if (self.handles.remove(conn)) { + self.releaseConn(conn); + } else |_| { + self.dirty.append(&conn.node); + } + } + + fn releaseConn(self: *Transport, conn: *http.Connection) void { + self.network.releaseConnection(conn); + } + + fn ensureNoActiveConnection(self: *const Transport) !void { + if (self.active > 0) return error.InflightConnection; + } +}; + +pub const Context = struct { + network: *Network, + + pub fn newHeaders(self: Context) !http.Headers { + return http.Headers.init(self.network.config.http_headers.user_agent_header); + } +}; + +pub const Layer = struct { + ptr: *anyopaque, + vtable: *const VTable, + + pub const VTable = struct { + request: *const fn (*anyopaque, Context, Request) anyerror!void, + }; + + pub fn request(self: Layer, ctx: Context, req: Request) !void { + return self.vtable.request(self.ptr, ctx, req); + } +}; + +// pub const RobotsLayer = struct { +// next: Layer = undefined, +// obey_robots: bool, +// allocator: Allocator, +// pending: std.StringHashMapUnmanaged(std.ArrayList(Request)) = .empty, + +// pub fn layer(self: *RobotsLayer) Layer { +// return .{ +// .ptr = self, +// .vtable = &.{ .request = _request }, +// }; +// } + +// pub fn deinit(self: *RobotsLayer) void { +// var it = self.pending.iterator(); +// while (it.next()) |entry| { +// entry.value_ptr.deinit(self.allocator); +// } +// self.pending.deinit(self.allocator); +// } + +// fn _request(ptr: *anyopaque, ctx: Context, req: Request) anyerror!void { +// const self: *RobotsLayer = @ptrCast(@alignCast(ptr)); + +// if (!self.obey_robots) { +// return self.next.request(ctx, req); +// } + +// const robots_url = try URL.getRobotsUrl(self.allocator, req.url); +// errdefer self.allocator.free(robots_url); + +// if (ctx.network.robot_store.get(robots_url)) |robot_entry| { +// defer self.allocator.free(robots_url); +// switch (robot_entry) { +// .present => |robots| { +// const path = URL.getPathname(req.url); +// if (!robots.isAllowed(path)) { +// req.error_callback(req.ctx, error.RobotsBlocked); +// return; +// } +// }, +// .absent => {}, +// } +// return self.next.request(ctx, req); +// } + +// return self.fetchRobotsThenRequest(ctx, robots_url, req); +// } + +// fn fetchRobotsThenRequest(self: *RobotsLayer, ctx: Context, robots_url: [:0]const u8, req: Request) !void { +// const entry = try self.pending.getOrPut(self.allocator, robots_url); + +// if (!entry.found_existing) { +// errdefer self.allocator.free(robots_url); +// entry.value_ptr.* = .empty; + +// const robots_ctx = try self.allocator.create(RobotsContext); +// errdefer self.allocator.destroy(robots_ctx); +// robots_ctx.* = .{ +// .layer = self, +// .ctx = ctx, +// .req = req, +// .robots_url = robots_url, +// .buffer = .empty, +// }; + +// const headers = try ctx.newHeaders(); +// log.debug(.browser, "fetching robots.txt", .{ .robots_url = robots_url }); +// try self.next.request(ctx, .{ +// .ctx = robots_ctx, +// .url = robots_url, +// .method = .GET, +// .headers = headers, +// .blocking = false, +// .frame_id = req.frame_id, +// .cookie_jar = req.cookie_jar, +// .cookie_origin = req.cookie_origin, +// .notification = req.notification, +// .resource_type = .fetch, +// .header_callback = RobotsContext.headerCallback, +// .data_callback = RobotsContext.dataCallback, +// .done_callback = RobotsContext.doneCallback, +// .error_callback = RobotsContext.errorCallback, +// .shutdown_callback = RobotsContext.shutdownCallback, +// }); +// } else { +// self.allocator.free(robots_url); +// } + +// try entry.value_ptr.append(self.allocator, req); +// } + +// fn flushPending(self: *RobotsLayer, ctx: Context, robots_url: [:0]const u8, allowed: bool) void { +// var queued = self.pending.fetchRemove(robots_url) orelse +// @panic("RobotsLayer.flushPending: missing queue"); +// defer queued.value.deinit(self.allocator); + +// for (queued.value.items) |queued_req| { +// if (!allowed) { +// log.warn(.http, "blocked by robots", .{ .url = queued_req.url }); +// queued_req.error_callback(queued_req.ctx, error.RobotsBlocked); +// } else { +// self.next.request(ctx, queued_req) catch |e| { +// queued_req.error_callback(queued_req.ctx, e); +// }; +// } +// } +// } + +// fn flushPendingShutdown(self: *RobotsLayer, robots_url: [:0]const u8) void { +// var queued = self.pending.fetchRemove(robots_url) orelse +// @panic("RobotsLayer.flushPendingShutdown: missing queue"); +// defer queued.value.deinit(self.allocator); + +// for (queued.value.items) |queued_req| { +// if (queued_req.shutdown_callback) |cb| cb(queued_req.ctx); +// } +// } +// }; + +// const RobotsContext = struct { +// layer: *RobotsLayer, +// ctx: Context, +// req: Request, +// robots_url: [:0]const u8, +// buffer: std.ArrayList(u8), +// status: u16 = 0, + +// fn deinit(self: *RobotsContext) void { +// self.layer.allocator.free(self.robots_url); +// self.buffer.deinit(self.layer.allocator); +// self.layer.allocator.destroy(self); +// } + +// fn headerCallback(response: Response) !bool { +// const self: *RobotsContext = @ptrCast(@alignCast(response.ctx)); +// // Robots callbacks only happen on real live requests. +// const transfer = response.inner.transfer; +// if (transfer.response_header) |hdr| { +// log.debug(.browser, "robots status", .{ .status = hdr.status, .robots_url = self.robots_url }); +// self.status = hdr.status; +// } +// if (transfer.getContentLength()) |cl| { +// try self.buffer.ensureTotalCapacity(self.layer.allocator, cl); +// } +// return true; +// } + +// fn dataCallback(response: Response, data: []const u8) !void { +// const self: *RobotsContext = @ptrCast(@alignCast(response.ctx)); +// try self.buffer.appendSlice(self.layer.allocator, data); +// } + +// fn doneCallback(ctx_ptr: *anyopaque) !void { +// const self: *RobotsContext = @ptrCast(@alignCast(ctx_ptr)); +// defer self.deinit(); + +// var allowed = true; +// const network = self.ctx.network; + +// switch (self.status) { +// 200 => { +// if (self.buffer.items.len > 0) { +// const robots: ?Robots = network.robot_store.robotsFromBytes( +// network.config.http_headers.user_agent, +// self.buffer.items, +// ) catch blk: { +// log.warn(.browser, "failed to parse robots", .{ .robots_url = self.robots_url }); +// try network.robot_store.putAbsent(self.robots_url); +// break :blk null; +// }; +// if (robots) |r| { +// try network.robot_store.put(self.robots_url, r); +// const path = URL.getPathname(self.req.url); +// allowed = r.isAllowed(path); +// } +// } +// }, +// 404 => { +// log.debug(.http, "robots not found", .{ .url = self.robots_url }); +// try network.robot_store.putAbsent(self.robots_url); +// }, +// else => { +// log.debug(.http, "unexpected status on robots", .{ +// .url = self.robots_url, +// .status = self.status, +// }); +// try network.robot_store.putAbsent(self.robots_url); +// }, +// } + +// self.layer.flushPending(self.ctx, self.robots_url, allowed); +// } + +// fn errorCallback(ctx_ptr: *anyopaque, err: anyerror) void { +// const self: *RobotsContext = @ptrCast(@alignCast(ctx_ptr)); +// defer self.deinit(); +// log.warn(.http, "robots fetch failed", .{ .err = err }); +// // On error, allow all queued requests to proceed. +// self.layer.flushPending(self.ctx, self.robots_url, true); +// } + +// fn shutdownCallback(ctx_ptr: *anyopaque) void { +// const self: *RobotsContext = @ptrCast(@alignCast(ctx_ptr)); +// defer self.deinit(); +// log.debug(.http, "robots fetch shutdown", .{}); +// self.layer.flushPendingShutdown(self.robots_url); +// } +// }; + +// pub const WebBotAuthLayer = struct { +// next: Layer = undefined, +// allocator: std.mem.Allocator, +// // null when web_bot_auth is not configured +// auth: ?*WebBotAuth, + +// pub fn deinit(self: *WebBotAuthLayer) void { +// if (self.auth) |wba| wba.deinit(self.allocator); +// } + +// pub fn layer(self: *WebBotAuthLayer) Layer { +// return .{ +// .ptr = self, +// .vtable = &.{ .request = _request }, +// }; +// } + +// fn _request(ptr: *anyopaque, ctx: Context, req: Request) anyerror!void { +// const self: *WebBotAuthLayer = @ptrCast(@alignCast(ptr)); +// var our_req = req; +// if (self.auth) |auth| { +// const authority = URL.getHost(req.url); +// try auth.signRequest(ctx.arena, &our_req.headers, authority); +// } +// return self.next.request(ctx, our_req); +// } +// }; + +pub fn LayerStack(comptime layer_types: anytype) type { + return struct { + ptrs: [layer_types.len]*anyopaque, + layers: [layer_types.len]Layer, + + const Self = @This(); + + pub fn init(allocator: Allocator, transport: *Transport, instances: anytype) !Self { + var ptrs: [layer_types.len]*anyopaque = undefined; + var layers: [layer_types.len]Layer = undefined; + + inline for (layer_types, 0..) |T, i| { + const ptr = try allocator.create(T); + ptr.* = instances[i]; + ptrs[i] = ptr; + layers[i] = ptr.layer(); + } + + // Wire inner: each layer's `next` points to the next layer. + // Done after all layers are created so pointers are stable. + inline for (layer_types, 0..) |T, i| { + if (@hasField(T, "next")) { + const ptr: *T = @ptrCast(@alignCast(ptrs[i])); + if (i + 1 < layer_types.len) { + ptr.next = layers[i + 1]; + } else { + ptr.next = transport.layer(); + } + } + } + + return .{ .ptrs = ptrs, .layers = layers }; + } + + pub fn deinit(self: *Self, allocator: Allocator) void { + inline for (layer_types, 0..) |T, i| { + const ptr: *T = @ptrCast(@alignCast(self.ptrs[i])); + if (@hasDecl(T, "deinit")) ptr.deinit(); + allocator.destroy(ptr); + } + } + + pub fn top(self: *Self) Layer { + return self.layers[0]; + } + }; +} + +// pub const Layers = LayerStack(.{ RobotsLayer, WebBotAuthLayer, CacheLayer }); +pub const Layers = LayerStack(.{CacheLayer}); + +const Client = @This(); + +transport: *Transport, +layers: Layers, + +pub fn init(allocator: Allocator, network: *Network) !*Client { + const transport = try Transport.init(allocator, network); + errdefer transport.deinit(); + + var layers = try Layers.init(allocator, transport, .{ + // RobotsLayer{ + // .obey_robots = network.config.obeyRobots(), + // .allocator = allocator, + // .pending = .empty, + // }, + // WebBotAuthLayer{ + // .auth = if (network.web_bot_auth) |*wba| wba else null, + // .allocator = allocator, + // }, + CacheLayer{}, + }); + errdefer layers.deinit(allocator); + + const client = try allocator.create(Client); + errdefer allocator.destroy(client); + + client.* = .{ .transport = transport, .layers = layers }; + return client; +} + +pub fn deinit(self: *Client) void { + const allocator = self.transport.allocator; + self.layers.deinit(allocator); + self.transport.deinit(); + allocator.destroy(self); +} + +pub fn setTlsVerify(self: *Client, verify: bool) !void { + return self.transport.setTlsVerify(verify); +} + +pub fn changeProxy(self: *Client, proxy: ?[:0]const u8) !void { + return self.transport.changeProxy(proxy); +} + +pub fn restoreOriginalProxy(self: *Client) !void { + return self.transport.restoreOriginalProxy(); +} + +pub fn newHeaders(self: *Client) !http.Headers { + return self.transport.newHeaders(); +} + +pub fn abort(self: *Client) void { + self.transport.abort(); +} + +pub fn abortFrame(self: *Client, frame_id: u32) void { + self.transport.abortFrame(frame_id); +} + +pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { + return self.transport.tick(timeout_ms); +} + +/// Dispatch a request through the full middleware stack. +pub fn request(self: *Client, req: Request) !void { + const ctx = Context{ .network = self.transport.network }; + return self.layers.top().request(ctx, req); +} + +pub fn continueTransfer(self: *Client, transfer: *Transfer) !void { + return self.transport.continueTransfer(transfer); +} + +pub fn abortTransfer(self: *Client, transfer: *Transfer) void { + return self.transport.abortTransfer(transfer); +} + +pub fn fulfillTransfer(self: *Client, transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { + return self.transport.fulfillTransfer(transfer, status, headers, body); +} + +pub fn nextReqId(self: *Client) u32 { + return self.transport.nextReqId(); +} + +pub fn incrReqId(self: *Client) u32 { + return self.transport.incrReqId(); +} + +pub fn setCdpClient(self: *Client, cdp_client: ?CDPClient) void { + self.transport.cdp_client = cdp_client; +} + +pub fn cdpClient(self: *Client) ?*CDPClient { + return &self.transport.cdp_client; +} + +pub fn active(self: *Client) usize { + return self.transport.active; +} + +pub fn intercepted(self: *Client) usize { + return self.transport.intercepted; +} -// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll -// both HTTP data as well as messages from an CDP connection. -// Furthermore, we have some tension between blocking scripts and request -// interception. For non-blocking scripts, because nothing blocks, we can -// just queue the scripts until we receive a response to the interception -// notification. But for blocking scripts (which block the parser), it's hard -// to return control back to the CDP loop. So the `read` function pointer is -// used by the Client to have the CDP client read more data from the socket, -// specifically when we're waiting for a request interception response to -// a blocking script. pub const CDPClient = struct { socket: posix.socket_t, ctx: *anyopaque, @@ -134,901 +997,6 @@ pub const CDPClient = struct { blocking_read_end: *const fn (*anyopaque) bool, }; -pub fn init(allocator: Allocator, network: *Network) !*Client { - var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator); - errdefer transfer_pool.deinit(); - - const client = try allocator.create(Client); - errdefer allocator.destroy(client); - - var handles = try http.Handles.init(network.config); - errdefer handles.deinit(); - - const http_proxy = network.config.httpProxy(); - - client.* = .{ - .handles = handles, - .network = network, - .allocator = allocator, - .transfer_pool = transfer_pool, - - .use_proxy = http_proxy != null, - .http_proxy = http_proxy, - .tls_verify = network.config.tlsVerifyHost(), - .obey_robots = network.config.obeyRobots(), - }; - - return client; -} - -pub fn deinit(self: *Client) void { - self.abort(); - self.handles.deinit(); - - self.transfer_pool.deinit(); - - var robots_iter = self.pending_robots_queue.iterator(); - while (robots_iter.next()) |entry| { - entry.value_ptr.deinit(self.allocator); - } - self.pending_robots_queue.deinit(self.allocator); - - self.allocator.destroy(self); -} - -// Enable TLS verification on all connections. -pub fn setTlsVerify(self: *Client, verify: bool) !void { - // Remove inflight connections check on enable TLS b/c chromiumoxide calls - // the command during navigate and Curl seems to accept it... - - var it = self.in_use.first; - while (it) |node| : (it = node.next) { - const conn: *http.Connection = @fieldParentPtr("node", node); - try conn.setTlsVerify(verify, self.use_proxy); - } - self.tls_verify = verify; -} - -// Restrictive since it'll only work if there are no inflight requests. In some -// cases, the libcurl documentation is clear that changing settings while a -// connection is inflight is undefined. It doesn't say anything about CURLOPT_PROXY, -// but better to be safe than sorry. -// For now, this restriction is ok, since it's only called by CDP on -// createBrowserContext, at which point, if we do have an active connection, -// that's probably a bug (a previous abort failed?). But if we need to call this -// at any point in time, it could be worth digging into libcurl to see if this -// can be changed at any point in the easy's lifecycle. -pub fn changeProxy(self: *Client, proxy: ?[:0]const u8) !void { - try self.ensureNoActiveConnection(); - self.http_proxy = proxy orelse self.network.config.httpProxy(); - self.use_proxy = self.http_proxy != null; -} - -pub fn newHeaders(self: *const Client) !http.Headers { - return http.Headers.init(self.network.config.http_headers.user_agent_header); -} - -pub fn abort(self: *Client) void { - self._abort(true, 0); -} - -pub fn abortFrame(self: *Client, frame_id: u32) void { - self._abort(false, frame_id); -} - -// Written this way so that both abort and abortFrame can share the same code -// but abort can avoid the frame_id check at comptime. -fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { - { - var n = self.in_use.first; - while (n) |node| { - n = node.next; - const conn: *http.Connection = @fieldParentPtr("node", node); - var transfer = Transfer.fromConnection(conn) catch |err| { - // Let's cleanup what we can - self.removeConn(conn); - log.err(.http, "get private info", .{ .err = err, .source = "abort" }); - continue; - }; - if (comptime abort_all) { - transfer.kill(); - } else if (transfer.req.frame_id == frame_id) { - transfer.kill(); - } - } - } - - { - var q = &self.queue; - var n = q.first; - while (n) |node| { - n = node.next; - const transfer: *Transfer = @fieldParentPtr("_node", node); - if (comptime abort_all) { - transfer.kill(); - } else if (transfer.req.frame_id == frame_id) { - q.remove(node); - transfer.kill(); - } - } - } - - if (comptime abort_all) { - self.queue = .{}; - } - - if (comptime IS_DEBUG and abort_all) { - // Even after an abort_all, we could still have transfers, but, at the - // very least, they should all be flagged as aborted. - var it = self.in_use.first; - var leftover: usize = 0; - while (it) |node| : (it = node.next) { - const conn: *http.Connection = @fieldParentPtr("node", node); - std.debug.assert((Transfer.fromConnection(conn) catch unreachable).aborted); - leftover += 1; - } - std.debug.assert(self.active == leftover); - } -} - -pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { - while (self.queue.popFirst()) |queue_node| { - const conn = self.network.getConnection() orelse { - self.queue.prepend(queue_node); - break; - }; - - try self.makeRequest(conn, @fieldParentPtr("_node", queue_node)); - } - - return self.perform(@intCast(timeout_ms)); -} - -pub fn request(self: *Client, req: Request) !void { - if (self.obey_robots == false) { - return self.processRequest(req); - } - - const robots_url = try URL.getRobotsUrl(self.allocator, req.url); - errdefer self.allocator.free(robots_url); - - // If we have this robots cached, we can take a fast path. - if (self.network.robot_store.get(robots_url)) |robot_entry| { - defer self.allocator.free(robots_url); - - switch (robot_entry) { - // If we have a found robots entry, we check it. - .present => |robots| { - const path = URL.getPathname(req.url); - if (!robots.isAllowed(path)) { - req.error_callback(req.ctx, error.RobotsBlocked); - return; - } - }, - // Otherwise, we assume we won't find it again. - .absent => {}, - } - - return self.processRequest(req); - } - return self.fetchRobotsThenProcessRequest(robots_url, req); -} - -fn serveFromCache(req: Request, cached: *const CachedResponse) !void { - const response = Response.fromCached(req.ctx, cached); - defer switch (cached.data) { - .buffer => |_| {}, - .file => |f| f.file.close(), - }; - - if (req.start_callback) |cb| { - try cb(response); - } - - const proceed = try req.header_callback(response); - if (!proceed) { - req.error_callback(req.ctx, error.Abort); - return; - } - - switch (cached.data) { - .buffer => |data| { - if (data.len > 0) { - try req.data_callback(response, data); - } - }, - .file => |f| { - const file = f.file; - - var buf: [1024]u8 = undefined; - var file_reader = file.reader(&buf); - try file_reader.seekTo(f.offset); - const reader = &file_reader.interface; - - var read_buf: [1024]u8 = undefined; - var remaining = f.len; - - while (remaining > 0) { - const read_len = @min(read_buf.len, remaining); - const n = try reader.readSliceShort(read_buf[0..read_len]); - if (n == 0) break; - remaining -= n; - try req.data_callback(response, read_buf[0..n]); - } - }, - } - - try req.done_callback(req.ctx); -} - -fn processRequest(self: *Client, req: Request) !void { - if (self.network.cache) |*cache| { - if (req.method == .GET) { - const arena = try self.network.app.arena_pool.acquire(.{ .debug = "HttpClient.processRequest.cache" }); - defer self.network.app.arena_pool.release(arena); - - var iter = req.headers.iterator(); - const req_header_list = try iter.collect(arena); - - if (cache.get(arena, .{ - .url = req.url, - .timestamp = std.time.timestamp(), - .request_headers = req_header_list.items, - })) |cached| { - defer req.headers.deinit(); - return serveFromCache(req, &cached); - } - } - } - - const transfer = try self.makeTransfer(req); - - transfer.req.notification.dispatch(.http_request_start, &.{ .transfer = transfer }); - - var wait_for_interception = false; - transfer.req.notification.dispatch(.http_request_intercept, &.{ - .transfer = transfer, - .wait_for_interception = &wait_for_interception, - }); - if (wait_for_interception == false) { - // request not intercepted, process it normally - return self.process(transfer); - } - - self.intercepted += 1; - if (comptime IS_DEBUG) { - log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted }); - } - transfer._intercept_state = .pending; - - if (req.blocking == false) { - // The request was interecepted, but it isn't a blocking request, so we - // dont' need to block this call. The request will be unblocked - // asynchronously via either continueTransfer or abortTransfer - return; - } - - if (try self.waitForInterceptedResponse(transfer)) { - return self.process(transfer); - } -} - -const RobotsRequestContext = struct { - client: *Client, - req: Request, - robots_url: [:0]const u8, - buffer: std.ArrayList(u8), - status: u16 = 0, - - pub fn deinit(self: *RobotsRequestContext) void { - self.client.allocator.free(self.robots_url); - self.buffer.deinit(self.client.allocator); - self.client.allocator.destroy(self); - } -}; - -fn fetchRobotsThenProcessRequest(self: *Client, robots_url: [:0]const u8, req: Request) !void { - const entry = try self.pending_robots_queue.getOrPut(self.allocator, robots_url); - - if (!entry.found_existing) { - errdefer self.allocator.free(robots_url); - - // If we aren't already fetching this robots, - // we want to create a new queue for it and add this request into it. - entry.value_ptr.* = .empty; - - const ctx = try self.allocator.create(RobotsRequestContext); - errdefer self.allocator.destroy(ctx); - ctx.* = .{ .client = self, .req = req, .robots_url = robots_url, .buffer = .empty }; - const headers = try self.newHeaders(); - - log.debug(.browser, "fetching robots.txt", .{ .robots_url = robots_url }); - try self.processRequest(.{ - .ctx = ctx, - .url = robots_url, - .method = .GET, - .headers = headers, - .blocking = false, - .frame_id = req.frame_id, - .cookie_jar = req.cookie_jar, - .cookie_origin = req.cookie_origin, - .notification = req.notification, - .resource_type = .fetch, - .header_callback = robotsHeaderCallback, - .data_callback = robotsDataCallback, - .done_callback = robotsDoneCallback, - .error_callback = robotsErrorCallback, - .shutdown_callback = robotsShutdownCallback, - }); - } else { - // Not using our own robots URL, only using the one from the first request. - self.allocator.free(robots_url); - } - - try entry.value_ptr.append(self.allocator, req); -} - -fn robotsHeaderCallback(response: Response) !bool { - const ctx: *RobotsRequestContext = @ptrCast(@alignCast(response.ctx)); - // Robots callbacks only happen on real live requests. - const transfer = response.inner.transfer; - - if (transfer.response_header) |hdr| { - log.debug(.browser, "robots status", .{ .status = hdr.status, .robots_url = ctx.robots_url }); - ctx.status = hdr.status; - } - - if (transfer.getContentLength()) |cl| { - try ctx.buffer.ensureTotalCapacity(ctx.client.allocator, cl); - } - - return true; -} - -fn robotsDataCallback(response: Response, data: []const u8) !void { - const ctx: *RobotsRequestContext = @ptrCast(@alignCast(response.ctx)); - try ctx.buffer.appendSlice(ctx.client.allocator, data); -} - -fn robotsDoneCallback(ctx_ptr: *anyopaque) !void { - const ctx: *RobotsRequestContext = @ptrCast(@alignCast(ctx_ptr)); - defer ctx.deinit(); - - var allowed = true; - - switch (ctx.status) { - 200 => { - if (ctx.buffer.items.len > 0) { - const robots: ?Robots = ctx.client.network.robot_store.robotsFromBytes( - ctx.client.network.config.http_headers.user_agent, - ctx.buffer.items, - ) catch blk: { - log.warn(.browser, "failed to parse robots", .{ .robots_url = ctx.robots_url }); - // If we fail to parse, we just insert it as absent and ignore. - try ctx.client.network.robot_store.putAbsent(ctx.robots_url); - break :blk null; - }; - - if (robots) |r| { - try ctx.client.network.robot_store.put(ctx.robots_url, r); - const path = URL.getPathname(ctx.req.url); - allowed = r.isAllowed(path); - } - } - }, - 404 => { - log.debug(.http, "robots not found", .{ .url = ctx.robots_url }); - // If we get a 404, we just insert it as absent. - try ctx.client.network.robot_store.putAbsent(ctx.robots_url); - }, - else => { - log.debug(.http, "unexpected status on robots", .{ .url = ctx.robots_url, .status = ctx.status }); - // If we get an unexpected status, we just insert as absent. - try ctx.client.network.robot_store.putAbsent(ctx.robots_url); - }, - } - - var queued = ctx.client.pending_robots_queue.fetchRemove( - ctx.robots_url, - ) orelse @panic("Client.robotsDoneCallbacke empty queue"); - defer queued.value.deinit(ctx.client.allocator); - - for (queued.value.items) |queued_req| { - if (!allowed) { - log.warn(.http, "blocked by robots", .{ .url = queued_req.url }); - queued_req.error_callback(queued_req.ctx, error.RobotsBlocked); - } else { - ctx.client.processRequest(queued_req) catch |e| { - queued_req.error_callback(queued_req.ctx, e); - }; - } - } -} - -fn robotsErrorCallback(ctx_ptr: *anyopaque, err: anyerror) void { - const ctx: *RobotsRequestContext = @ptrCast(@alignCast(ctx_ptr)); - defer ctx.deinit(); - - log.warn(.http, "robots fetch failed", .{ .err = err }); - - var queued = ctx.client.pending_robots_queue.fetchRemove( - ctx.robots_url, - ) orelse @panic("Client.robotsErrorCallback empty queue"); - defer queued.value.deinit(ctx.client.allocator); - - // On error, allow all queued requests to proceed - for (queued.value.items) |queued_req| { - ctx.client.processRequest(queued_req) catch |e| { - queued_req.error_callback(queued_req.ctx, e); - }; - } -} - -fn robotsShutdownCallback(ctx_ptr: *anyopaque) void { - const ctx: *RobotsRequestContext = @ptrCast(@alignCast(ctx_ptr)); - defer ctx.deinit(); - - log.debug(.http, "robots fetch shutdown", .{}); - - var queued = ctx.client.pending_robots_queue.fetchRemove( - ctx.robots_url, - ) orelse @panic("Client.robotsErrorCallback empty queue"); - defer queued.value.deinit(ctx.client.allocator); - - for (queued.value.items) |queued_req| { - if (queued_req.shutdown_callback) |shutdown_cb| { - shutdown_cb(queued_req.ctx); - } - } -} - -fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool { - // The request was intercepted and is blocking. This is messy, but our - // callers, the ScriptManager -> Page, don't have a great way to stop the - // parser and return control to the CDP server to wait for the interception - // response. We have some information on the CDPClient, so we'll do the - // blocking here. (This is a bit of a legacy thing. Initially the Client - // had a 'extra_socket' that it could monitor. It was named 'extra_socket' - // to appear generic, but really, that 'extra_socket' was always the CDP - // socket. Because we already had the "extra_socket" here, it was easier to - // make it even more CDP- aware and turn `extra_socket: socket_t` into the - // current CDPClient and do the blocking here). - const cdp_client = self.cdp_client.?; - const ctx = cdp_client.ctx; - - if (cdp_client.blocking_read_start(ctx) == false) { - return error.BlockingInterceptFailure; - } - - defer _ = cdp_client.blocking_read_end(ctx); - - while (true) { - if (cdp_client.blocking_read(ctx) == false) { - return error.BlockingInterceptFailure; - } - - switch (transfer._intercept_state) { - .pending => continue, // keep waiting - .@"continue" => return true, - .abort => |err| { - transfer.abort(err); - return false; - }, - .fulfilled => { - // callbacks already called, just need to cleanups - transfer.deinit(); - return false; - }, - .not_intercepted => unreachable, - } - } -} - -// Above, request will not process if there's an interception request. In such -// cases, the interecptor is expected to call resume to continue the transfer -// or transfer.abort() to abort it. -fn process(self: *Client, transfer: *Transfer) !void { - // libcurl doesn't allow recursive calls, if we're in a `perform()` operation - // then we _have_ to queue this. - if (self.performing == false) { - if (self.network.getConnection()) |conn| { - return self.makeRequest(conn, transfer); - } - } - - self.queue.append(&transfer._node); -} - -// For an intercepted request -pub fn continueTransfer(self: *Client, transfer: *Transfer) !void { - if (comptime IS_DEBUG) { - std.debug.assert(transfer._intercept_state != .not_intercepted); - log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted }); - } - self.intercepted -= 1; - - if (!transfer.req.blocking) { - return self.process(transfer); - } - transfer._intercept_state = .@"continue"; -} - -// For an intercepted request -pub fn abortTransfer(self: *Client, transfer: *Transfer) void { - if (comptime IS_DEBUG) { - std.debug.assert(transfer._intercept_state != .not_intercepted); - log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted }); - } - self.intercepted -= 1; - - if (!transfer.req.blocking) { - transfer.abort(error.Abort); - } - transfer._intercept_state = .{ .abort = error.Abort }; -} - -// For an intercepted request -pub fn fulfillTransfer(self: *Client, transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { - if (comptime IS_DEBUG) { - std.debug.assert(transfer._intercept_state != .not_intercepted); - log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted }); - } - self.intercepted -= 1; - - try transfer.fulfill(status, headers, body); - if (!transfer.req.blocking) { - transfer.deinit(); - return; - } - transfer._intercept_state = .fulfilled; -} - -pub fn nextReqId(self: *Client) u32 { - return self.next_request_id +% 1; -} - -pub fn incrReqId(self: *Client) u32 { - const id = self.next_request_id +% 1; - self.next_request_id = id; - return id; -} - -fn makeTransfer(self: *Client, req: Request) !*Transfer { - errdefer req.headers.deinit(); - - const transfer = try self.transfer_pool.create(); - errdefer self.transfer_pool.destroy(transfer); - - const id = self.incrReqId(); - transfer.* = .{ - .arena = ArenaAllocator.init(self.allocator), - .id = id, - .url = req.url, - .req = req, - .client = self, - .max_response_size = self.network.config.httpMaxResponseSize(), - }; - return transfer; -} - -fn requestFailed(transfer: *Transfer, err: anyerror, comptime execute_callback: bool) void { - if (transfer._notified_fail) { - // we can force a failed request within a callback, which will eventually - // result in this being called again in the more general loop. We do this - // because we can raise a more specific error inside a callback in some cases - return; - } - - transfer._notified_fail = true; - - transfer.req.notification.dispatch(.http_request_fail, &.{ - .transfer = transfer, - .err = err, - }); - - if (execute_callback) { - transfer.req.error_callback(transfer.req.ctx, err); - } else if (transfer.req.shutdown_callback) |cb| { - cb(transfer.req.ctx); - } -} - -// Same restriction as changeProxy. Should be ok since this is only called on -// BrowserContext deinit. -pub fn restoreOriginalProxy(self: *Client) !void { - try self.ensureNoActiveConnection(); - - self.http_proxy = self.network.config.httpProxy(); - self.use_proxy = self.http_proxy != null; -} - -fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyerror!void { - { - // Reset per-response state for retries (auth challenge, queue). - const auth = transfer._auth_challenge; - transfer.reset(); - transfer._auth_challenge = auth; - - transfer._conn = conn; - errdefer { - transfer._conn = null; - transfer.deinit(); - self.releaseConn(conn); - } - - try transfer.configureConn(conn); - } - - // As soon as this is called, our "perform" loop is responsible for - // cleaning things up. That's why the above code is in a block. If anything - // fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do - // cleanup. But if things fail after `curl_multi_add_handle`, we expect - // perfom to pickup the failure and cleanup. - self.in_use.append(&conn.node); - self.handles.add(conn) catch |err| { - transfer._conn = null; - transfer.deinit(); - self.in_use.remove(&conn.node); - self.releaseConn(conn); - return err; - }; - self.active += 1; - - if (transfer.req.start_callback) |cb| { - cb(Response.fromTransfer(transfer)) catch |err| { - transfer.deinit(); - return err; - }; - } - _ = try self.perform(0); -} - -pub const PerformStatus = enum { - cdp_socket, - normal, -}; - -fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { - const running = blk: { - self.performing = true; - defer self.performing = false; - - break :blk try self.handles.perform(); - }; - - // Process dirty connections — return them to Network pool. - while (self.dirty.popFirst()) |node| { - const conn: *http.Connection = @fieldParentPtr("node", node); - self.handles.remove(conn) catch |err| { - log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); - @panic("multi_remove_handle"); - }; - self.releaseConn(conn); - } - - // We're potentially going to block for a while until we get data. Process - // whatever messages we have waiting ahead of time. - if (try self.processMessages()) { - return .normal; - } - - var status = PerformStatus.normal; - if (self.cdp_client) |cdp_client| { - var wait_fds = [_]http.WaitFd{.{ - .fd = cdp_client.socket, - .events = .{ .pollin = true }, - .revents = .{}, - }}; - try self.handles.poll(&wait_fds, timeout_ms); - if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { - status = .cdp_socket; - } - } else if (running > 0) { - try self.handles.poll(&.{}, timeout_ms); - } - - _ = try self.processMessages(); - return status; -} - -fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool { - // Detect auth challenge from response headers. - // Also check on RecvError: proxy may send 407 with headers before - // closing the connection (CONNECT tunnel not yet established). - if (msg.err == null or msg.err.? == error.RecvError) { - transfer.detectAuthChallenge(&msg.conn); - } - - // In case of auth challenge - // TODO give a way to configure the number of auth retries. - if (transfer._auth_challenge != null and transfer._tries < 10) { - var wait_for_interception = false; - transfer.req.notification.dispatch( - .http_request_auth_required, - &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception }, - ); - if (wait_for_interception) { - self.intercepted += 1; - if (comptime IS_DEBUG) { - log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted }); - } - transfer._intercept_state = .pending; - - // Wether or not this is a blocking request, we're not going - // to process it now. We can end the transfer, which will - // release the easy handle back into the pool. The transfer - // is still valid/alive (just has no handle). - transfer.releaseConn(); - if (!transfer.req.blocking) { - // In the case of an async request, we can just "forget" - // about this transfer until it gets updated asynchronously - // from some CDP command. - return false; - } - - // In the case of a sync request, we need to block until we - // get the CDP command for handling this case. - if (try self.waitForInterceptedResponse(transfer)) { - // we've been asked to continue with the request - // we can't process it here, since we're already inside - // a process, so we need to queue it and wait for the - // next tick (this is why it was safe to releaseConn - // above, because even in the "blocking" path, we still - // only process it on the next tick). - self.queue.append(&transfer._node); - } else { - // aborted, already cleaned up - } - - return false; - } - } - - // Handle redirects: reuse the same connection to preserve TCP state. - if (msg.err == null) { - const status = try msg.conn.getResponseCode(); - if (status >= 300 and status <= 399) { - try transfer.handleRedirect(); - - const conn = transfer._conn.?; - - try self.handles.remove(conn); - transfer._conn = null; - transfer._detached_conn = conn; // signal orphan for processMessages cleanup - - transfer.reset(); - try transfer.configureConn(conn); - try self.handles.add(conn); - transfer._detached_conn = null; - transfer._conn = conn; // reattach after successful re-add - - _ = try self.perform(0); - - return false; - } - } - - // Transfer is done (success or error). Caller (processMessages) owns deinit. - // Return true = done (caller will deinit), false = continues (redirect/auth). - - // When the server sends "Connection: close" and closes the TLS - // connection without a close_notify alert, BoringSSL reports - // RecvError. If we already received valid HTTP headers, this is - // a normal end-of-body (the connection closure signals the end - // of the response per HTTP/1.1 when there is no Content-Length). - // We must check this before endTransfer, which may reset the - // easy handle. - const is_conn_close_recv = blk: { - const err = msg.err orelse break :blk false; - if (err != error.RecvError) break :blk false; - const hdr = msg.conn.getResponseHeader("connection", 0) orelse break :blk false; - break :blk std.ascii.eqlIgnoreCase(hdr.value, "close"); - }; - - // make sure the transfer can't be immediately aborted from a callback - // since we still need it here. - transfer._performing = true; - defer transfer._performing = false; - - if (msg.err != null and !is_conn_close_recv) { - transfer.requestFailed(transfer._callback_error orelse msg.err.?, true); - return true; - } - - if (!transfer._header_done_called) { - // In case of request w/o data, we need to call the header done - // callback now. - const proceed = try transfer.headerDoneCallback(&msg.conn); - if (!proceed) { - transfer.requestFailed(error.Abort, true); - return true; - } - } - - const body = transfer._stream_buffer.items; - - // Replay buffered body through user's data_callback. - if (transfer._stream_buffer.items.len > 0) { - try transfer.req.data_callback(Response.fromTransfer(transfer), body); - - transfer.req.notification.dispatch(.http_response_data, &.{ - .data = body, - .transfer = transfer, - }); - - if (transfer.aborted) { - transfer.requestFailed(error.Abort, true); - return true; - } - } - - if (transfer._pending_cache_metadata) |metadata| { - const cache = &self.network.cache.?; - cache.put(metadata.*, body) catch |err| { - log.warn(.cache, "cache put failed", .{ .err = err }); - }; - } - - // release conn ASAP so that it's available; some done_callbacks - // will load more resources. - transfer.releaseConn(); - - try transfer.req.done_callback(transfer.req.ctx); - - transfer.req.notification.dispatch(.http_request_done, &.{ - .transfer = transfer, - }); - - return true; -} - -fn processMessages(self: *Client) !bool { - var processed = false; - while (self.handles.readMessage()) |msg| { - const transfer = try Transfer.fromConnection(&msg.conn); - const done = self.processOneMessage(msg, transfer) catch |err| blk: { - log.err(.http, "process_messages", .{ .err = err, .req = transfer }); - transfer.requestFailed(err, true); - if (transfer._detached_conn) |c| { - // Conn was removed from handles during redirect reconfiguration - // but not re-added. Release it directly to avoid double-remove. - self.in_use.remove(&c.node); - self.active -= 1; - self.releaseConn(c); - transfer._detached_conn = null; - } - break :blk true; - }; - if (done) { - transfer.deinit(); - processed = true; - } - } - return processed; -} - -fn removeConn(self: *Client, conn: *http.Connection) void { - self.in_use.remove(&conn.node); - self.active -= 1; - if (self.handles.remove(conn)) { - self.releaseConn(conn); - } else |_| { - // Can happen if we're in a perform() call, so we'll queue this - // for cleanup later. - self.dirty.append(&conn.node); - } -} - -fn releaseConn(self: *Client, conn: *http.Connection) void { - self.network.releaseConnection(conn); -} - -fn ensureNoActiveConnection(self: *const Client) !void { - if (self.active > 0) { - return error.InflightConnection; - } -} - pub const Request = struct { frame_id: u32, method: Method, @@ -1041,23 +1009,22 @@ pub const Request = struct { credentials: ?[:0]const u8 = null, notification: *Notification, max_response_size: ?usize = null, - - // This is only relevant for intercepted requests. If a request is flagged - // as blocking AND is intercepted, then it'll be up to us to wait until - // we receive a response to the interception. This probably isn't ideal, - // but it's harder for our caller (ScriptManager) to deal with this. One - // reason for that is the Http Client is already a bit CDP-aware. blocking: bool = false, - - // arbitrary data that can be associated with this request ctx: *anyopaque = undefined, - start_callback: ?*const fn (response: Response) anyerror!void = null, - header_callback: *const fn (response: Response) anyerror!bool, - data_callback: *const fn (response: Response, data: []const u8) anyerror!void, - done_callback: *const fn (ctx: *anyopaque) anyerror!void, - error_callback: *const fn (ctx: *anyopaque, err: anyerror) void, - shutdown_callback: ?*const fn (ctx: *anyopaque) void = null, + start_callback: ?StartCallback = null, + header_callback: HeaderCallback, + data_callback: DataCallback, + done_callback: DoneCallback, + error_callback: ErrorCallback, + shutdown_callback: ?ShutdownCallback = null, + + pub const StartCallback = *const fn (response: Response) anyerror!void; + pub const HeaderCallback = *const fn (response: Response) anyerror!bool; + pub const DataCallback = *const fn (response: Response, data: []const u8) anyerror!void; + pub const DoneCallback = *const fn (ctx: *anyopaque) anyerror!void; + pub const ErrorCallback = *const fn (ctx: *anyopaque, err: anyerror) void; + pub const ShutdownCallback = *const fn (ctx: *anyopaque) void; const ResourceType = enum { document, @@ -1065,10 +1032,6 @@ pub const Request = struct { script, fetch, - // Allowed Values: Document, Stylesheet, Image, Media, Font, Script, - // TextTrack, XHR, Fetch, Prefetch, EventSource, WebSocket, Manifest, - // SignedExchange, Ping, CSPViolationReport, Preflight, FedCM, Other - // https://chromedevtools.github.io/devtools-protocol/tot/Network/#type-ResourceType pub fn string(self: ResourceType) []const u8 { return switch (self) { .document => "Document", @@ -1160,47 +1123,23 @@ pub const Transfer = struct { id: u32 = 0, req: Request, url: [:0]const u8, - client: *Client, - // total bytes received in the response, including the response status line, - // the headers, and the [encoded] body. + client: *Transport, bytes_received: usize = 0, - _pending_cache_metadata: ?*CacheMetadata = null, - aborted: bool = false, - max_response_size: ?usize = null, - - // We'll store the response header here response_header: ?ResponseHead = null, - - // track if the header callbacks done have been called. _header_done_called: bool = false, - _notified_fail: bool = false, - _conn: ?*http.Connection = null, - // Set when conn is temporarily detached from transfer during redirect - // reconfiguration. Used by processMessages to release the orphaned conn - // if reconfiguration fails. _detached_conn: ?*http.Connection = null, - _auth_challenge: ?http.AuthChallenge = null, - - // number of times the transfer has been tried. - // incremented by reset func. _tries: u8 = 0, _performing: bool = false, _redirect_count: u8 = 0, _skip_body: bool = false, _first_data_received: bool = false, - - // Buffered response body. Filled by dataCallback, consumed in processMessages. _stream_buffer: std.ArrayList(u8) = .{}, - - // Error captured in dataCallback to be reported in processMessages. _callback_error: ?anyerror = null, - - // for when a Transfer is queued in the client.queue _node: std.DoublyLinkedList.Node = .{}, _intercept_state: InterceptState = .not_intercepted, @@ -1224,7 +1163,6 @@ pub const Transfer = struct { self.client.removeConn(conn); self._conn = null; } - self.req.headers.deinit(); self.arena.deinit(); self.client.transfer_pool.destroy(self); @@ -1232,15 +1170,10 @@ pub const Transfer = struct { pub fn abort(self: *Transfer, err: anyerror) void { self.requestFailed(err, true); - if (self._performing or self.client.performing) { - // We're currently in a curl_multi_perform. We cannot call - // curl_multi_remove_handle from a curl callback. Instead, we flag - // this transfer and our callbacks will check for this flag. self.aborted = true; return; } - self.deinit(); } @@ -1249,20 +1182,9 @@ pub const Transfer = struct { self.deinit(); } - // internal, when the page is shutting down. Doesn't have the same ceremony - // as abort (doesn't send a notification, doesn't invoke an error callback) fn kill(self: *Transfer) void { - if (self.req.shutdown_callback) |cb| { - cb(self.req.ctx); - } - + if (self.req.shutdown_callback) |cb| cb(self.req.ctx); if (self._performing or self.client.performing) { - // We're currently inside of a callback. This client, and libcurl - // generally don't expect a transfer to become deinitialized during - // a callback. We can flag the transfer as aborted (which is what - // we do when transfer.abort() is called in this condition) AND, - // since this "kill()"should prevent any future callbacks, the best - // we can do is null/noop them. self.aborted = true; self.req.start_callback = null; self.req.shutdown_callback = null; @@ -1272,22 +1194,16 @@ pub const Transfer = struct { self.req.error_callback = Noop.errorCallback; return; } - self.deinit(); } - // We can force a failed request within a callback, which will eventually - // result in this being called again in the more general loop. We do this - // because we can raise a more specific error inside a callback in some cases. fn requestFailed(self: *Transfer, err: anyerror, comptime execute_callback: bool) void { if (self._notified_fail) return; self._notified_fail = true; - self.req.notification.dispatch(.http_request_fail, &.{ .transfer = self, .err = err, }); - if (execute_callback) { self.req.error_callback(self.req.ctx, err); } else if (self.req.shutdown_callback) |cb| { @@ -1299,14 +1215,13 @@ pub const Transfer = struct { const client = self.client; const req = &self.req; - // Set callbacks and per-client settings on the pooled connection. try conn.setCallbacks(Transfer.dataCallback); try conn.setFollowLocation(false); try conn.setProxy(client.http_proxy); try conn.setTlsVerify(client.tls_verify, client.use_proxy); - try conn.setURL(req.url); try conn.setMethod(req.method); + if (req.body) |b| { try conn.setBody(b); } else { @@ -1317,20 +1232,12 @@ pub const Transfer = struct { try conn.secretHeaders(&header_list, &client.network.config.http_headers); try conn.setHeaders(&header_list); - // If we have WebBotAuth, sign our request. - if (client.network.web_bot_auth) |*wba| { - const authority = URL.getHost(req.url); - try wba.signRequest(self.arena.allocator(), &header_list, authority); - } - - // Add cookies from cookie jar. if (try self.getCookieString()) |cookies| { try conn.setCookies(@ptrCast(cookies.ptr)); } try conn.setPrivate(self); - // add credentials if (req.credentials) |creds| { if (self._auth_challenge != null and self._auth_challenge.?.source == .proxy) { try conn.setProxyCredentials(creds); @@ -1357,20 +1264,16 @@ pub const Transfer = struct { if (comptime IS_DEBUG) { std.debug.assert(self.response_header == null); } - const url = try conn.getEffectiveUrl(); - const status: u16 = if (self._auth_challenge != null) 407 else try conn.getResponseCode(); - self.response_header = .{ .url = url, .status = status, .redirect_count = self._redirect_count, }; - if (conn.getResponseHeader("content-type", 0)) |ct| { var hdr = &self.response_header.?; const value = ct.value; @@ -1395,15 +1298,11 @@ pub const Transfer = struct { } pub fn format(self: *Transfer, writer: *std.Io.Writer) !void { - const req = self.req; - return writer.print("{s} {s}", .{ @tagName(req.method), req.url }); + return writer.print("{s} {s}", .{ @tagName(self.req.method), self.req.url }); } pub fn updateURL(self: *Transfer, url: [:0]const u8) !void { - // for cookies self.url = url; - - // for the request itself self.req.url = url; } @@ -1417,23 +1316,15 @@ pub const Transfer = struct { return error.TooManyRedirects; } - // retrieve cookies from the redirect's response. if (req.cookie_jar) |jar| { var i: usize = 0; while (conn.getResponseHeader("set-cookie", i)) |ct| : (i += 1) { try jar.populateFromResponse(transfer.url, ct.value); - - if (i >= ct.amount) { - break; - } + if (i >= ct.amount) break; } } - // resolve the redirect target. - const location = conn.getResponseHeader("location", 0) orelse { - return error.LocationNotFound; - }; - + const location = conn.getResponseHeader("location", 0) orelse return error.LocationNotFound; const base_url = try conn.getEffectiveUrl(); const url = try URL.resolve(arena, std.mem.span(base_url), location.value, .{}); try transfer.updateURL(url); @@ -1475,23 +1366,17 @@ pub const Transfer = struct { pub fn replaceRequestHeaders(self: *Transfer, allocator: Allocator, headers: []const http.Header) !void { self.req.headers.deinit(); - var buf: std.ArrayList(u8) = .empty; var new_headers = try self.client.newHeaders(); for (headers) |hdr| { - // safe to re-use this buffer, because Headers.add because curl copies - // the value we pass into curl_slist_append. defer buf.clearRetainingCapacity(); try std.fmt.format(buf.writer(allocator), "{s}: {s}", .{ hdr.name, hdr.value }); - try buf.append(allocator, 0); // null terminated + try buf.append(allocator, 0); try new_headers.add(buf.items[0 .. buf.items.len - 1 :0]); } self.req.headers = new_headers; } - // abortAuthChallenge is called when an auth challenge interception is - // abort. We don't call self.releaseConn here b/c it has been done - // before interception process. pub fn abortAuthChallenge(self: *Transfer) void { if (comptime IS_DEBUG) { std.debug.assert(self._intercept_state != .not_intercepted); @@ -1505,9 +1390,6 @@ pub const Transfer = struct { self._intercept_state = .{ .abort = error.AbortAuthChallenge }; } - // headerDoneCallback is called once the headers have been read. - // It can be called either on dataCallback or once the request for those - // w/o body. fn headerDoneCallback(transfer: *Transfer, conn: *const http.Connection) !bool { lp.assert(transfer._header_done_called == false, "Transfer.headerDoneCallback", .{}); defer transfer._header_done_called = true; @@ -1530,76 +1412,21 @@ pub const Transfer = struct { if (transfer.max_response_size) |max_size| { if (transfer.getContentLength()) |cl| { - if (cl > max_size) { - return error.ResponseTooLarge; - } + if (cl > max_size) return error.ResponseTooLarge; } } - transfer.req.notification.dispatch(.http_response_header_done, &.{ - .transfer = transfer, - }); + transfer.req.notification.dispatch(.http_response_header_done, &.{ .transfer = transfer }); const proceed = transfer.req.header_callback(Response.fromTransfer(transfer)) catch |err| { log.err(.http, "header_callback", .{ .err = err, .req = transfer }); return err; }; - if (transfer.client.network.cache != null and transfer.req.method == .GET) { - const rh = &transfer.response_header.?; - const allocator = transfer.arena.allocator(); - - const vary = if (conn.getResponseHeader("vary", 0)) |h| h.value else null; - - const maybe_cm = try Cache.tryCache( - allocator, - std.time.timestamp(), - transfer.url, - rh.status, - rh.contentType(), - if (conn.getResponseHeader("cache-control", 0)) |h| h.value else null, - vary, - if (conn.getResponseHeader("age", 0)) |h| h.value else null, - conn.getResponseHeader("set-cookie", 0) != null, - conn.getResponseHeader("authorization", 0) != null, - ); - - if (maybe_cm) |cm| { - var iter = transfer.responseHeaderIterator(); - var header_list = try iter.collect(allocator); - const end_of_response = header_list.items.len; - - if (vary) |vary_str| { - var req_it = transfer.req.headers.iterator(); - - while (req_it.next()) |hdr| { - var vary_iter = std.mem.splitScalar(u8, vary_str, ','); - - while (vary_iter.next()) |part| { - const name = std.mem.trim(u8, part, &std.ascii.whitespace); - if (std.ascii.eqlIgnoreCase(hdr.name, name)) { - try header_list.append(allocator, .{ - .name = try allocator.dupe(u8, hdr.name), - .value = try allocator.dupe(u8, hdr.value), - }); - } - } - } - } - - const metadata = try transfer.arena.allocator().create(CacheMetadata); - metadata.* = cm; - metadata.headers = header_list.items[0..end_of_response]; - metadata.vary_headers = header_list.items[end_of_response..]; - transfer._pending_cache_metadata = metadata; - } - } - return proceed and transfer.aborted == false; } fn dataCallback(buffer: [*]const u8, chunk_count: usize, chunk_len: usize, data: *anyopaque) usize { - // libcurl should only ever emit 1 chunk at a time if (comptime IS_DEBUG) { std.debug.assert(chunk_count == 1); } @@ -1613,7 +1440,6 @@ pub const Transfer = struct { if (!transfer._first_data_received) { transfer._first_data_received = true; - // Skip body for responses that will be retried (redirects, auth challenges). const status = conn.getResponseCode() catch |err| { log.err(.http, "getResponseCode", .{ .err = err, .source = "body callback" }); return http.writefunc_error; @@ -1623,7 +1449,6 @@ pub const Transfer = struct { return @intCast(chunk_len); } - // Pre-size buffer from Content-Length. if (transfer.getContentLength()) |cl| { if (transfer.max_response_size) |max_size| { if (cl > max_size) { @@ -1651,23 +1476,14 @@ pub const Transfer = struct { return http.writefunc_error; }; - if (transfer.aborted) { - return http.writefunc_error; - } - + if (transfer.aborted) return http.writefunc_error; return @intCast(chunk_len); } pub fn responseHeaderIterator(self: *Transfer) HeaderIterator { if (self._conn) |conn| { - // If we have a connection, than this is a real curl request and we - // iterate through the header that curl maintains. return .{ .curl = .{ .conn = conn } }; } - // If there's no handle, it either means this is being called before - // the request is even being made (which would be a bug in the code) - // or when a response was injected via transfer.fulfill. The injected - // header should be iterated, since there is no handle/easy. return .{ .list = .{ .list = self.response_header.?._injected_headers } }; } @@ -1678,12 +1494,9 @@ pub const Transfer = struct { pub fn fulfill(transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { if (transfer._conn != null) { - // should never happen, should have been intercepted/paused, and then - // either continued, aborted or fulfilled once. @branchHint(.unlikely); return error.RequestInProgress; } - transfer._fulfill(status, headers, body) catch |err| { transfer.req.error_callback(transfer.req.ctx, err); return err; @@ -1695,7 +1508,6 @@ pub const Transfer = struct { if (req.start_callback) |cb| { try cb(Response.fromTransfer(transfer)); } - transfer.response_header = .{ .status = status, .url = req.url, @@ -1710,22 +1522,17 @@ pub const Transfer = struct { break; } } - lp.assert(transfer._header_done_called == false, "Transfer.fulfill header_done_called", .{}); if (try req.header_callback(Response.fromTransfer(transfer)) == false) { transfer.abort(error.Abort); return; } - if (body) |b| { try req.data_callback(Response.fromTransfer(transfer), b); } - try req.done_callback(req.ctx); } - // This function should be called during the dataCallback. Calling it after - // such as in the doneCallback is guaranteed to return null. pub fn getContentLength(self: *const Transfer) ?u32 { const cl = self.getContentLengthRawValue() orelse return null; return std.fmt.parseInt(u32, cl, 10) catch null; @@ -1733,23 +1540,15 @@ pub const Transfer = struct { fn getContentLengthRawValue(self: *const Transfer) ?[]const u8 { if (self._conn) |conn| { - // If we have a connection, than this is a normal request. We can get the - // header value from the connection. const cl = conn.getResponseHeader("content-length", 0) orelse return null; return cl.value; } - - // If we have no handle, then maybe this is being called after the - // doneCallback. OR, maybe this is a "fulfilled" request. Let's check - // the injected headers (if we have any). - const rh = self.response_header orelse return null; for (rh._injected_headers) |hdr| { if (std.ascii.eqlIgnoreCase(hdr.name, "content-length")) { return hdr.value; } } - return null; } }; diff --git a/src/browser/Runner.zig b/src/browser/Runner.zig index 4ee753ea..e5c27324 100644 --- a/src/browser/Runner.zig +++ b/src/browser/Runner.zig @@ -135,7 +135,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { .pre, .raw, .text, .image => { // The main page hasn't started/finished navigating. // There's no JS to run, and no reason to run the scheduler. - if (http_client.active == 0 and (comptime is_cdp) == false) { + if (http_client.active() == 0 and (comptime is_cdp) == false) { // haven't started navigating, I guess. return .done; } @@ -169,8 +169,8 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { // Each call to this runs scheduled load events. try page.dispatchLoad(); - const http_active = http_client.active; - const total_network_activity = http_active + http_client.intercepted; + const http_active = http_client.active(); + const total_network_activity = http_active + http_client.intercepted(); if (page._notified_network_almost_idle.check(total_network_activity <= 2)) { page.notifyNetworkAlmostIdle(); } @@ -183,7 +183,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { // because is_cdp is true, and that can only be // the case when interception isn't possible. if (comptime IS_DEBUG) { - std.debug.assert(http_client.intercepted == 0); + std.debug.assert(http_client.intercepted() == 0); } if (browser.hasBackgroundTasks()) { diff --git a/src/cdp/domains/page.zig b/src/cdp/domains/page.zig index cf3cdd7d..e4ea8758 100644 --- a/src/cdp/domains/page.zig +++ b/src/cdp/domains/page.zig @@ -139,8 +139,8 @@ fn setLifecycleEventsEnabled(cmd: *CDP.Command) !void { try sendPageLifecycle(bc, "load", now, frame_id, loader_id); const http_client = page._session.browser.http_client; - const http_active = http_client.active; - const total_network_activity = http_active + http_client.intercepted; + const http_active = http_client.active(); + const total_network_activity = http_active + http_client.intercepted(); if (page._notified_network_almost_idle.check(total_network_activity <= 2)) { try sendPageLifecycle(bc, "networkAlmostIdle", now, frame_id, loader_id); } diff --git a/src/network/layer/CacheLayer.zig b/src/network/layer/CacheLayer.zig new file mode 100644 index 00000000..fae7935e --- /dev/null +++ b/src/network/layer/CacheLayer.zig @@ -0,0 +1,240 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const log = @import("../../log.zig"); + +const http = @import("../http.zig"); +const Transfer = @import("../../browser/HttpClient.zig").Transfer; +const Context = @import("../../browser/HttpClient.zig").Context; +const Request = @import("../../browser/HttpClient.zig").Request; +const Response = @import("../../browser/HttpClient.zig").Response; +const Layer = @import("../../browser/HttpClient.zig").Layer; + +const Cache = @import("../cache/Cache.zig"); +const CachedMetadata = @import("../cache/Cache.zig").CachedMetadata; +const CachedResponse = @import("../cache/Cache.zig").CachedResponse; +const Forward = @import("Forward.zig"); + +const CacheLayer = @This(); + +next: Layer = undefined, + +pub fn layer(self: *CacheLayer) Layer { + return .{ + .ptr = self, + .vtable = &.{ + .request = request, + }, + }; +} + +fn request(ptr: *anyopaque, ctx: Context, req: Request) anyerror!void { + const self: *CacheLayer = @ptrCast(@alignCast(ptr)); + const network = ctx.network; + + if (network.cache == null or req.method != .GET) { + return self.next.request(ctx, req); + } + + const arena = try network.app.arena_pool.acquire(.{ .debug = "CacheLayer" }); + errdefer network.app.arena_pool.release(arena); + + var iter = req.headers.iterator(); + const req_header_list = try iter.collect(arena); + + if (network.cache.?.get(arena, .{ + .url = req.url, + .timestamp = std.time.timestamp(), + .request_headers = req_header_list.items, + })) |cached| { + defer req.headers.deinit(); + defer network.app.arena_pool.release(arena); + return serveFromCache(req, &cached); + } + + const cache_ctx = try arena.create(CacheContext); + cache_ctx.* = .{ + .arena = arena, + .context = ctx, + .forward = Forward.fromRequest(req), + .req_url = req.url, + .req_headers = req.headers, + }; + + const wrapped = cache_ctx.forward.wrapRequest( + req, + cache_ctx, + "forward", + .{ + .start = CacheContext.startCallback, + .header = CacheContext.headerCallback, + .done = CacheContext.doneCallback, + .shutdown = CacheContext.shutdownCallback, + .err = CacheContext.errorCallback, + }, + ); + + return self.next.request(ctx, wrapped); +} + +fn serveFromCache(req: Request, cached: *const CachedResponse) !void { + const response = Response.fromCached(req.ctx, cached); + defer switch (cached.data) { + .buffer => |_| {}, + .file => |f| f.file.close(), + }; + + if (req.start_callback) |cb| { + try cb(response); + } + + const proceed = try req.header_callback(response); + if (!proceed) { + req.error_callback(req.ctx, error.Abort); + return; + } + + switch (cached.data) { + .buffer => |data| { + if (data.len > 0) { + try req.data_callback(response, data); + } + }, + .file => |f| { + const file = f.file; + var buf: [1024]u8 = undefined; + var file_reader = file.reader(&buf); + try file_reader.seekTo(f.offset); + const reader = &file_reader.interface; + var read_buf: [1024]u8 = undefined; + var remaining = f.len; + while (remaining > 0) { + const read_len = @min(read_buf.len, remaining); + const n = try reader.readSliceShort(read_buf[0..read_len]); + if (n == 0) break; + remaining -= n; + try req.data_callback(response, read_buf[0..n]); + } + }, + } + + try req.done_callback(req.ctx); +} + +const CacheContext = struct { + arena: std.mem.Allocator, + context: Context, + transfer: ?*Transfer = null, + forward: Forward, + req_url: [:0]const u8, + req_headers: http.Headers, + pending_metadata: ?*CachedMetadata = null, + + fn startCallback(response: Response) anyerror!void { + const self: *CacheContext = @ptrCast(@alignCast(response.ctx)); + self.transfer = response.inner.transfer; + return self.forward.forwardStart(response); + } + + fn headerCallback(response: Response) anyerror!bool { + const self: *CacheContext = @ptrCast(@alignCast(response.ctx)); + const allocator = self.arena; + + const transfer = response.inner.transfer; + var rh = &transfer.response_header.?; + + const conn = transfer._conn.?; + + const vary = if (conn.getResponseHeader("vary", 0)) |h| h.value else null; + + const maybe_cm = try Cache.tryCache( + allocator, + std.time.timestamp(), + transfer.url, + rh.status, + rh.contentType(), + if (conn.getResponseHeader("cache-control", 0)) |h| h.value else null, + vary, + if (conn.getResponseHeader("age", 0)) |h| h.value else null, + conn.getResponseHeader("set-cookie", 0) != null, + conn.getResponseHeader("authorization", 0) != null, + ); + + if (maybe_cm) |cm| { + var iter = transfer.responseHeaderIterator(); + var header_list = try iter.collect(allocator); + const end_of_response = header_list.items.len; + + if (vary) |vary_str| { + var req_it = self.req_headers.iterator(); + while (req_it.next()) |hdr| { + var vary_iter = std.mem.splitScalar(u8, vary_str, ','); + while (vary_iter.next()) |part| { + const name = std.mem.trim(u8, part, &std.ascii.whitespace); + if (std.ascii.eqlIgnoreCase(hdr.name, name)) { + try header_list.append(allocator, .{ + .name = try allocator.dupe(u8, hdr.name), + .value = try allocator.dupe(u8, hdr.value), + }); + } + } + } + + const metadata = try allocator.create(CachedMetadata); + metadata.* = cm; + metadata.headers = header_list.items[0..end_of_response]; + metadata.vary_headers = header_list.items[end_of_response..]; + self.pending_metadata = metadata; + } + } + + return self.forward.forwardHeader(response); + } + + fn doneCallback(ctx: *anyopaque) anyerror!void { + const self: *CacheContext = @ptrCast(@alignCast(ctx)); + defer self.context.network.app.arena_pool.release(self.arena); + + const transfer = self.transfer orelse @panic("Start Callback didn't set CacheLayer.transfer"); + + if (self.pending_metadata) |metadata| { + const cache = &self.context.network.cache.?; + + log.debug(.browser, "http cache", .{ .key = self.req_url, .metadata = metadata }); + cache.put(metadata.*, transfer._stream_buffer.items) catch |err| { + log.warn(.http, "cache put failed", .{ .err = err }); + }; + log.debug(.browser, "http.cache.put", .{ .url = self.req_url }); + } + + return self.forward.forwardDone(); + } + + fn shutdownCallback(ctx: *anyopaque) void { + const self: *CacheContext = @ptrCast(@alignCast(ctx)); + defer self.context.network.app.arena_pool.release(self.arena); + self.forward.forwardShutdown(); + } + + fn errorCallback(ctx: *anyopaque, e: anyerror) void { + const self: *CacheContext = @ptrCast(@alignCast(ctx)); + defer self.context.network.app.arena_pool.release(self.arena); + self.forward.forwardErr(e); + } +}; diff --git a/src/network/layer/Forward.zig b/src/network/layer/Forward.zig new file mode 100644 index 00000000..8f0521a8 --- /dev/null +++ b/src/network/layer/Forward.zig @@ -0,0 +1,135 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const Request = @import("../../browser/HttpClient.zig").Request; +const Response = @import("../../browser/HttpClient.zig").Response; + +const Forward = @This(); + +ctx: *anyopaque, +start: ?Request.StartCallback, +header: Request.HeaderCallback, +data: Request.DataCallback, +done: Request.DoneCallback, +err: Request.ErrorCallback, +shutdown: ?Request.ShutdownCallback, + +pub fn fromRequest(req: Request) Forward { + return .{ + .ctx = req.ctx, + .start = req.start_callback, + .header = req.header_callback, + .data = req.data_callback, + .done = req.done_callback, + .err = req.error_callback, + .shutdown = req.shutdown_callback, + }; +} + +pub const Overrides = struct { + start: ?Request.StartCallback = null, + header: ?Request.HeaderCallback = null, + data: ?Request.DataCallback = null, + done: ?Request.DoneCallback = null, + err: ?Request.ErrorCallback = null, + shutdown: ?Request.ShutdownCallback = null, +}; + +pub fn wrapRequest( + self: *Forward, + req: Request, + new_ctx: anytype, + comptime field: []const u8, + overrides: Overrides, +) Request { + const T = @TypeOf(new_ctx.*); + const PassthroughT = makePassthrough(T, field); + var wrapped = req; + wrapped.ctx = new_ctx; + wrapped.start_callback = overrides.start orelse if (self.start != null) PassthroughT.start else null; + wrapped.header_callback = overrides.header orelse PassthroughT.header; + wrapped.data_callback = overrides.data orelse PassthroughT.data; + wrapped.done_callback = overrides.done orelse PassthroughT.done; + wrapped.error_callback = overrides.err orelse PassthroughT.err; + wrapped.shutdown_callback = overrides.shutdown orelse if (self.shutdown != null) PassthroughT.shutdown else null; + return wrapped; +} + +fn makePassthrough(comptime T: type, comptime field: []const u8) type { + return struct { + pub fn start(response: Response) anyerror!void { + const self: *T = @ptrCast(@alignCast(response.ctx)); + return @field(self, field).forwardStart(response); + } + + pub fn header(response: Response) anyerror!bool { + const self: *T = @ptrCast(@alignCast(response.ctx)); + return @field(self, field).forwardHeader(response); + } + + pub fn data(response: Response, chunk: []const u8) anyerror!void { + const self: *T = @ptrCast(@alignCast(response.ctx)); + return @field(self, field).forwardData(response, chunk); + } + + pub fn done(ctx_ptr: *anyopaque) anyerror!void { + const self: *T = @ptrCast(@alignCast(ctx_ptr)); + return @field(self, field).forwardDone(); + } + + pub fn err(ctx_ptr: *anyopaque, e: anyerror) void { + const self: *T = @ptrCast(@alignCast(ctx_ptr)); + @field(self, field).forwardErr(e); + } + + pub fn shutdown(ctx_ptr: *anyopaque) void { + const self: *T = @ptrCast(@alignCast(ctx_ptr)); + @field(self, field).forwardShutdown(); + } + }; +} + +pub fn forwardStart(self: Forward, response: Response) anyerror!void { + var fwd = response; + fwd.ctx = self.ctx; + if (self.start) |cb| try cb(fwd); +} + +pub fn forwardHeader(self: Forward, response: Response) anyerror!bool { + var fwd = response; + fwd.ctx = self.ctx; + return self.header(fwd); +} + +pub fn forwardData(self: Forward, response: Response, chunk: []const u8) anyerror!void { + var fwd = response; + fwd.ctx = self.ctx; + return self.data(fwd, chunk); +} + +pub fn forwardDone(self: Forward) anyerror!void { + return self.done(self.ctx); +} + +pub fn forwardErr(self: Forward, e: anyerror) void { + self.err(self.ctx, e); +} + +pub fn forwardShutdown(self: Forward) void { + if (self.shutdown) |cb| cb(self.ctx); +} diff --git a/src/network/layer/RobotsLayer.zig b/src/network/layer/RobotsLayer.zig new file mode 100644 index 00000000..e69de29b diff --git a/src/network/layer/WebBotAuthLayer.zig b/src/network/layer/WebBotAuthLayer.zig new file mode 100644 index 00000000..e69de29b