diff --git a/src/Server.zig b/src/Server.zig index 438bea14..2d0ae830 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -123,17 +123,23 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { defer client.deinit(); var http = &self.app.http; - http.monitorSocket(socket); - defer http.unmonitorSocket(); + http.addCDPClient(.{ + .socket = socket, + .ctx = client, + .blocking_read_start = Client.blockingReadStart, + .blocking_read = Client.blockingRead, + .blocking_read_end = Client.blockingReadStop, + }); + defer http.removeCDPClient(); std.debug.assert(client.mode == .http); while (true) { - if (http.poll(timeout_ms) != .extra_socket) { + if (http.poll(timeout_ms) != .cdp_socket) { log.info(.app, "CDP timeout", .{}); return; } - if (try client.readSocket() == false) { + if (client.readSocket() == false) { return; } @@ -147,19 +153,19 @@ fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { var ms_remaining = timeout_ms; while (true) { switch (cdp.pageWait(ms_remaining)) { - .extra_socket => { - if (try client.readSocket() == false) { + .cdp_socket => { + if (client.readSocket() == false) { return; } last_message = timestamp(.monotonic); ms_remaining = timeout_ms; }, .no_page => { - if (http.poll(ms_remaining) != .extra_socket) { + if (http.poll(ms_remaining) != .cdp_socket) { log.info(.app, "CDP timeout", .{}); return; } - if (try client.readSocket() == false) { + if (client.readSocket() == false) { return; } last_message = timestamp(.monotonic); @@ -229,7 +235,30 @@ pub const Client = struct { self.send_arena.deinit(); } - fn readSocket(self: *Client) !bool { + fn blockingReadStart(ctx: *anyopaque) bool { + const self: *Client = @ptrCast(@alignCast(ctx)); + _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))) catch |err| { + log.warn(.app, "CDP blockingReadStart", .{ .err = err }); + return false; + }; + return true; + } + + fn blockingRead(ctx: *anyopaque) bool { + const self: *Client = @ptrCast(@alignCast(ctx)); + return self.readSocket(); + } + + fn blockingReadStop(ctx: *anyopaque) bool { + const self: *Client = @ptrCast(@alignCast(ctx)); + _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| { + log.warn(.app, "CDP blockingReadStop", .{ .err = err }); + return false; + }; + return true; + } + + fn readSocket(self: *Client) bool { const n = posix.read(self.socket, self.readBuf()) catch |err| { log.warn(.app, "CDP read", .{ .err = err }); return false; diff --git a/src/browser/Page.zig b/src/browser/Page.zig index fd42b083..58b63dfb 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -743,7 +743,7 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult { var scheduler = &self.scheduler; var http_client = self._session.browser.http_client; - // I'd like the page to know NOTHING about extra_socket / CDP, but the + // I'd like the page to know NOTHING about cdp_socket / CDP, but the // fact is that the behavior of wait changes depending on whether or // not we're using CDP. // If we aren't using CDP, as soon as we think there's nothing left @@ -753,7 +753,7 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult { // we could let CDP poll http (like it does for HTTP requests), the fact // is that we know more about the timing of stuff (e.g. how long to // poll/sleep) in the page. - const exit_when_done = http_client.extra_socket == null; + const exit_when_done = http_client.cdp_client == null; // for debugging // defer self.printWaitAnalysis(); @@ -770,15 +770,15 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult { // Either we have active http connections, or we're in CDP // mode with an extra socket. Either way, we're waiting // for http traffic - if (try http_client.tick(@intCast(ms_remaining)) == .extra_socket) { + if (try http_client.tick(@intCast(ms_remaining)) == .cdp_socket) { // exit_when_done is explicitly set when there isn't // an extra socket, so it should not be possibl to - // get an extra_socket message when exit_when_done + // get an cdp_socket message when exit_when_done // is true. std.debug.assert(exit_when_done == false); // data on a socket we aren't handling, return to caller - return .extra_socket; + return .cdp_socket; } }, .html, .complete => { @@ -846,13 +846,13 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult { } else { // We're here because we either have active HTTP // connections, or exit_when_done == false (aka, there's - // an extra_socket registered with the http client). + // an cdp_socket registered with the http client). // We should continue to run lowPriority tasks, so we // minimize how long we'll poll for network I/O. const ms_to_wait = @min(200, @min(ms_remaining, ms_to_next_task orelse 200)); - if (try http_client.tick(ms_to_wait) == .extra_socket) { + if (try http_client.tick(ms_to_wait) == .cdp_socket) { // data on a socket we aren't handling, return to caller - return .extra_socket; + return .cdp_socket; } } }, diff --git a/src/browser/ScriptManager.zig b/src/browser/ScriptManager.zig index 5f55952f..363e1fa1 100644 --- a/src/browser/ScriptManager.zig +++ b/src/browser/ScriptManager.zig @@ -244,6 +244,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e }, }; + const is_blocking = script.mode == .normal; if (remote_url) |url| { errdefer script.deinit(true); @@ -255,6 +256,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e .ctx = script, .method = .GET, .headers = headers, + .blocking = is_blocking, .cookie_jar = &page._session.cookie_jar, .resource_type = .script, .start_callback = if (log.enabled(.http, .debug)) Script.startCallback else null, @@ -274,7 +276,7 @@ pub fn addFromElement(self: *ScriptManager, comptime from_parser: bool, script_e } } - if (script.mode != .normal) { + if (is_blocking == false) { const list = self.scriptList(script); list.append(&script.node); return; diff --git a/src/browser/Session.zig b/src/browser/Session.zig index da6b7dd4..43a68296 100644 --- a/src/browser/Session.zig +++ b/src/browser/Session.zig @@ -137,7 +137,7 @@ pub fn currentPage(self: *Session) ?*Page { pub const WaitResult = enum { done, no_page, - extra_socket, + cdp_socket, navigate, }; diff --git a/src/http/Client.zig b/src/http/Client.zig index bb407e2b..03b6b80d 100644 --- a/src/http/Client.zig +++ b/src/http/Client.zig @@ -34,6 +34,8 @@ const ArenaAllocator = std.heap.ArenaAllocator; const errorCheck = Http.errorCheck; const errorMCheck = Http.errorMCheck; +const IS_DEBUG = builtin.mode == .Debug; + const Method = Http.Method; // This is loosely tied to a browser Page. Loading all the , doing @@ -100,10 +102,25 @@ use_proxy: bool, // The complete user-agent header line user_agent: [:0]const u8, -// libcurl can monitor arbitrary sockets. Currently, we ever [maybe] want to -// monitor the CDP client socket, so we've done the simplest thing possible -// by having this single optional field -extra_socket: ?posix.socket_t = null, +cdp_client: ?CDPClient = null, + +// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll +// both HTTP data as well as messages from an CDP connection. +// Furthermore, we have some tension between blocking scripts and request +// interception. For non-blocking scripts, because nothing blocks, we can +// just queue the scripts until we receive a response to the interception +// notification. But for blocking scripts (which block the parser), it's hard +// to return control back to the CDP loop. So the `read` function pointer is +// used by the Client to have the CDP client read more data from the socket, +// specifically when we're waiting for a request interception response to +// a blocking script. +pub const CDPClient = struct { + socket: posix.socket_t, + ctx: *anyopaque, + blocking_read_start: *const fn (*anyopaque) bool, + blocking_read: *const fn (*anyopaque) bool, + blocking_read_end: *const fn (*anyopaque) bool, +}; const TransferQueue = std.DoublyLinkedList; @@ -175,7 +192,7 @@ pub fn abort(self: *Client) void { // We can remove some (all?) of these once we're confident its right. std.debug.assert(self.handles.in_use.first == null); std.debug.assert(self.handles.available.len() == self.handles.handles.len); - if (builtin.mode == .Debug) { + if (comptime IS_DEBUG) { var running: c_int = undefined; std.debug.assert(c.curl_multi_perform(self.multi, &running) == c.CURLE_OK); std.debug.assert(running == 0); @@ -200,23 +217,71 @@ pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { pub fn request(self: *Client, req: Request) !void { const transfer = try self.makeTransfer(req); - if (self.notification) |notification| { - notification.dispatch(.http_request_start, &.{ .transfer = transfer }); + const notification = self.notification orelse return self.process(transfer); - var wait_for_interception = false; - notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception }); - if (wait_for_interception) { - self.intercepted += 1; - log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted }); - if (builtin.mode == .Debug) { - transfer._intercepted = true; - } - // The user is send an invitation to intercept this request. - return; + notification.dispatch(.http_request_start, &.{ .transfer = transfer }); + + var wait_for_interception = false; + notification.dispatch(.http_request_intercept, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception }); + if (wait_for_interception == false) { + // request not intercepted, process it normally + return self.process(transfer); + } + + self.intercepted += 1; + if (comptime IS_DEBUG) { + log.debug(.http, "wait for interception", .{ .intercepted = self.intercepted }); + } + transfer._intercept_state = .pending; + + if (req.blocking == false) { + // The request was interecepted, but it isn't a blocking request, so we + // dont' need to block this call. The request will be unblocked + // asynchronously via eitehr continueTransfer or abortTransfer + return; + } + + // The request was intercepted and is blocking. This is messy, but our + // callers, the ScriptManager -> Page, don't have a great way to stop the + // parser and return control to the CDP server to wait for the interception + // response. We have some information on the CDPClient, so we'll do the + // blocking here. (This is a bit of a legacy thing. Initially the Client) + // had a 'extra_socket' that it could monitor. It was named 'extra_socket' + // to appear generic, but really, that 'extra_socket' was always the CDP + // socket that we could monitor in libcurm. Because we already had + // the "extra_socket" here, it was easier just to make it even more CDP- + // aware and turn `extra_socket: socket_t` into the current CDPClient). + const cdp_client = self.cdp_client.?; + const ctx = cdp_client.ctx; + + if (cdp_client.blocking_read_start(ctx) == false) { + return error.BlockingInterceptFailure; + } + + while (true) { + if (cdp_client.blocking_read(ctx) == false) { + return error.BlockingInterceptFailure; + } + + switch (transfer._intercept_state) { + .pending => continue, // keep waiting + .@"continue" => return self.process(transfer), + .abort => { + transfer.abort(); + return; + }, + .fulfilled => { + // callbacks already called, just need to cleanups + transfer.deinit(); + return; + }, + .not_intercepted => unreachable, } } - return self.process(transfer); + if (cdp_client.blocking_read_end(ctx) == false) { + return error.BlockingInterceptFailure; + } } // Above, request will not process if there's an interception request. In such @@ -232,32 +297,45 @@ fn process(self: *Client, transfer: *Transfer) !void { // For an intercepted request pub fn continueTransfer(self: *Client, transfer: *Transfer) !void { - if (builtin.mode == .Debug) { - std.debug.assert(transfer._intercepted); + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted }); } self.intercepted -= 1; - log.debug(.http, "continue transfer", .{ .intercepted = self.intercepted }); - return self.process(transfer); + + transfer._intercept_state = .@"continue"; + if (!transfer.req.blocking) { + return self.process(transfer); + } } // For an intercepted request pub fn abortTransfer(self: *Client, transfer: *Transfer) void { - if (builtin.mode == .Debug) { - std.debug.assert(transfer._intercepted); + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted }); } self.intercepted -= 1; - log.debug(.http, "abort transfer", .{ .intercepted = self.intercepted }); - transfer.abort(); + + transfer._intercept_state = .abort; + if (!transfer.req.blocking) { + transfer.abort(); + } } // For an intercepted request pub fn fulfillTransfer(self: *Client, transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void { - if (builtin.mode == .Debug) { - std.debug.assert(transfer._intercepted); + if (comptime IS_DEBUG) { + std.debug.assert(transfer._intercept_state != .not_intercepted); + log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted }); } self.intercepted -= 1; - log.debug(.http, "filfull transfer", .{ .intercepted = self.intercepted }); - return transfer.fulfill(status, headers, body); + + transfer._intercept_state = .fulfilled; + try transfer.fulfill(status, headers, body); + if (!transfer.req.blocking) { + transfer.deinit(); + } } pub fn nextReqId(self: *Client) usize { @@ -428,7 +506,7 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void { } pub const PerformStatus = enum { - extra_socket, + cdp_socket, normal, }; @@ -444,16 +522,16 @@ fn perform(self: *Client, timeout_ms: c_int) !PerformStatus { } var status = PerformStatus.normal; - if (self.extra_socket) |s| { + if (self.cdp_client) |cdp_client| { var wait_fd = c.curl_waitfd{ - .fd = s, + .fd = cdp_client.socket, .events = c.CURL_WAIT_POLLIN, .revents = 0, }; try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null)); if (wait_fd.revents != 0) { // the extra socket we passed in is ready, let's signal our caller - status = .extra_socket; + status = .cdp_socket; } } else if (running > 0) { try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null)); @@ -476,7 +554,8 @@ fn processMessages(self: *Client) !bool { const transfer = try Transfer.fromEasy(easy); // In case of auth challenge - if (transfer._auth_challenge != null and transfer._tries < 10) { // TODO give a way to configure the number of auth retries. + // TODO give a way to configure the number of auth retries. + if (transfer._auth_challenge != null and transfer._tries < 10) { if (transfer.client.notification) |notification| { var wait_for_interception = false; notification.dispatch(.http_request_auth_required, &.{ .transfer = transfer, .wait_for_interception = &wait_for_interception }); @@ -486,10 +565,10 @@ fn processMessages(self: *Client) !bool { // Note: we don't deinit transfer on purpose: we want to keep // using it for the following request. self.intercepted += 1; - log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted }); - if (builtin.mode == .Debug) { - transfer._intercepted = true; + if (comptime IS_DEBUG) { + log.debug(.http, "wait for auth interception", .{ .intercepted = self.intercepted }); } + transfer._intercept_state = .pending; self.endTransfer(transfer); continue; } @@ -668,6 +747,13 @@ pub const Request = struct { resource_type: ResourceType, credentials: ?[:0]const u8 = null, + // This is only relevant for intercepted requests. If a request is flagged + // as blocking AND is interecepted, then it'll be up to us to wait until + // we receive a response to the interception. This probably isn't ideal, + // but it's harder for our caller (ScriptManager) to deal with this. One + // reason for that is the Http Client is already a bit CDP-aware. + blocking: bool = false, + // arbitrary data that can be associated with this request ctx: *anyopaque = undefined, @@ -768,7 +854,15 @@ pub const Transfer = struct { // for when a Transfer is queued in the client.queue _node: std.DoublyLinkedList.Node = .{}, - _intercepted: if (builtin.mode == .Debug) bool else void = if (builtin.mode == .Debug) false else {}, + _intercept_state: InterceptState = .not_intercepted, + + const InterceptState = enum { + not_intercepted, + pending, + @"continue", + abort, + fulfilled, + }; pub fn reset(self: *Transfer) void { self._redirecting = false; @@ -879,11 +973,11 @@ pub const Transfer = struct { // abort. We don't call self.client.endTransfer here b/c it has been done // before interception process. pub fn abortAuthChallenge(self: *Transfer) void { - if (builtin.mode == .Debug) { - std.debug.assert(self._intercepted); + if (comptime IS_DEBUG) { + std.debug.assert(self._intercept_state != .not_intercepted); + log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted }); } self.client.intercepted -= 1; - log.debug(.http, "abort auth transfer", .{ .intercepted = self.client.intercepted }); self.client.requestFailed(self, error.AbortAuthChallenge); self.deinit(); } @@ -990,7 +1084,9 @@ pub const Transfer = struct { if (std.mem.startsWith(u8, header, "HTTP/")) { // Is it the first header line. if (buf_len < 13) { - log.debug(.http, "invalid response line", .{ .line = header }); + if (comptime IS_DEBUG) { + log.debug(.http, "invalid response line", .{ .line = header }); + } return 0; } const version_start: usize = if (header[5] == '2') 7 else 9; @@ -1001,7 +1097,9 @@ pub const Transfer = struct { std.debug.assert(version_end < 13); const status = std.fmt.parseInt(u16, header[version_start..version_end], 10) catch { - log.debug(.http, "invalid status code", .{ .line = header }); + if (comptime IS_DEBUG) { + log.debug(.http, "invalid status code", .{ .line = header }); + } return 0; }; @@ -1058,7 +1156,9 @@ pub const Transfer = struct { if (transfer._redirecting) { // parse and set cookies for the redirection. redirectionCookies(transfer, easy) catch |err| { - log.debug(.http, "redirection cookies", .{ .err = err }); + if (comptime IS_DEBUG) { + log.debug(.http, "redirection cookies", .{ .err = err }); + } return 0; }; return buf_len; @@ -1128,7 +1228,7 @@ pub const Transfer = struct { pub fn fulfill(transfer: *Transfer, status: u16, headers: []const Http.Header, body: ?[]const u8) !void { if (transfer._handle != null) { // should never happen, should have been intercepted/paused, and then - // either continued, aborted and fulfilled once. + // either continued, aborted or fulfilled once. @branchHint(.unlikely); return error.RequestInProgress; } diff --git a/src/http/Http.zig b/src/http/Http.zig index 1a558029..8d6dcacb 100644 --- a/src/http/Http.zig +++ b/src/http/Http.zig @@ -90,12 +90,13 @@ pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { }; } -pub fn monitorSocket(self: *Http, socket: posix.socket_t) void { - self.client.extra_socket = socket; +pub fn addCDPClient(self: *Http, cdp_client: Client.CDPClient) void { + std.debug.assert(self.client.cdp_client == null); + self.client.cdp_client = cdp_client; } -pub fn unmonitorSocket(self: *Http) void { - self.client.extra_socket = null; +pub fn removeCDPClient(self: *Http) void { + self.client.cdp_client = null; } pub fn newConnection(self: *Http) !Connection {