Merge pull request #1003 from lightpanda-io/http_always_monitor_cdp

Http always monitor cdp
This commit is contained in:
Karl Seguin
2025-09-03 20:35:49 +08:00
committed by GitHub
14 changed files with 239 additions and 199 deletions

View File

@@ -44,6 +44,7 @@ pub fn reset(self: *Scheduler) void {
const AddOpts = struct { const AddOpts = struct {
name: []const u8 = "", name: []const u8 = "",
low_priority: bool = false,
}; };
pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: AddOpts) !void { pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: AddOpts) !void {
if (ms > 5_000) { if (ms > 5_000) {
@@ -51,7 +52,9 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: Ad
// ignore any task that we're almost certainly never going to run // ignore any task that we're almost certainly never going to run
return; return;
} }
return self.primary.add(.{
var q = if (opts.low_priority) &self.secondary else &self.primary;
return q.add(.{
.ms = std.time.milliTimestamp() + ms, .ms = std.time.milliTimestamp() + ms,
.ctx = ctx, .ctx = ctx,
.func = func, .func = func,

View File

@@ -285,7 +285,7 @@ pub fn blockingGet(self: *ScriptManager, url: [:0]const u8) !BlockingResult {
// rely on http's timeout settings to avoid an endless/long loop. // rely on http's timeout settings to avoid an endless/long loop.
while (true) { while (true) {
_ = try client.tick(.{ .timeout_ms = 200 }); _ = try client.tick(200);
switch (blocking.state) { switch (blocking.state) {
.running => {}, .running => {},
.done => |result| return result, .done => |result| return result,
@@ -428,7 +428,7 @@ fn errorCallback(ctx: *anyopaque, err: anyerror) void {
// It could be pending because: // It could be pending because:
// (a) we're still downloading its content or // (a) we're still downloading its content or
// (b) this is a non-async script that has to be executed in order // (b) this is a non-async script that has to be executed in order
const PendingScript = struct { pub const PendingScript = struct {
script: Script, script: Script,
complete: bool, complete: bool,
node: OrderList.Node, node: OrderList.Node,

View File

@@ -215,7 +215,11 @@ pub const Window = struct {
} }
pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 { pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 {
return self.createTimeout(cbk, 5, page, .{ .animation_frame = true, .name = "animationFrame" }); return self.createTimeout(cbk, 5, page, .{
.animation_frame = true,
.name = "animationFrame",
.low_priority = true,
});
} }
pub fn _cancelAnimationFrame(self: *Window, id: u32) !void { pub fn _cancelAnimationFrame(self: *Window, id: u32) !void {
@@ -269,6 +273,7 @@ pub const Window = struct {
args: []Env.JsObject = &.{}, args: []Env.JsObject = &.{},
repeat: bool = false, repeat: bool = false,
animation_frame: bool = false, animation_frame: bool = false,
low_priority: bool = false,
}; };
fn createTimeout(self: *Window, cbk: Function, delay_: ?u32, page: *Page, opts: CreateTimeoutOpts) !u32 { fn createTimeout(self: *Window, cbk: Function, delay_: ?u32, page: *Page, opts: CreateTimeoutOpts) !u32 {
const delay = delay_ orelse 0; const delay = delay_ orelse 0;
@@ -319,7 +324,10 @@ pub const Window = struct {
.repeat = if (opts.repeat) delay + 1 else null, .repeat = if (opts.repeat) delay + 1 else null,
}; };
try page.scheduler.add(callback, TimerCallback.run, delay, .{ .name = opts.name }); try page.scheduler.add(callback, TimerCallback.run, delay, .{
.name = opts.name,
.low_priority = opts.low_priority,
});
return timer_id; return timer_id;
} }

View File

@@ -90,16 +90,6 @@ pub const Page = struct {
load_state: LoadState = .parsing, load_state: LoadState = .parsing,
// Page.wait balances waiting for resources / tasks and producing an output.
// Up until a timeout, Page.wait will always wait for inflight or pending
// HTTP requests, via the Http.Client.active counter. However, intercepted
// requests (via CDP, but it could be anything), aren't considered "active"
// connection. So it's possible that we have intercepted requests (which are
// pending on some driver to continue/abort) while Http.Client.active == 0.
// This boolean exists to supplment Http.Client.active and inform Page.wait
// of pending connections.
request_intercepted: bool = false,
const Mode = union(enum) { const Mode = union(enum) {
pre: void, pre: void,
err: anyerror, err: anyerror,
@@ -262,23 +252,26 @@ pub const Page = struct {
return self.script_manager.blockingGet(src); return self.script_manager.blockingGet(src);
} }
pub fn wait(self: *Page, wait_sec: u16) void { pub fn wait(self: *Page, wait_ms: i32) Session.WaitResult {
self._wait(wait_sec) catch |err| switch (err) { return self._wait(wait_ms) catch |err| {
error.JsError => {}, // already logged (with hopefully more context) switch (err) {
else => { error.JsError => {}, // already logged (with hopefully more context)
// There may be errors from the http/client or ScriptManager else => {
// that we should not treat as an error like this. Will need // There may be errors from the http/client or ScriptManager
// to run this through more real-world sites and see if we need // that we should not treat as an error like this. Will need
// to expand the switch (err) to have more customized logs for // to run this through more real-world sites and see if we need
// specific messages. // to expand the switch (err) to have more customized logs for
log.err(.browser, "page wait", .{ .err = err }); // specific messages.
}, log.err(.browser, "page wait", .{ .err = err });
},
}
return .done;
}; };
} }
fn _wait(self: *Page, wait_sec: u16) !void { fn _wait(self: *Page, wait_ms: i32) !Session.WaitResult {
var timer = try std.time.Timer.start(); var timer = try std.time.Timer.start();
var ms_remaining: i32 = @intCast(wait_sec * 1000); var ms_remaining = wait_ms;
var try_catch: Env.TryCatch = undefined; var try_catch: Env.TryCatch = undefined;
try_catch.init(self.main_context); try_catch.init(self.main_context);
@@ -287,42 +280,48 @@ pub const Page = struct {
var scheduler = &self.scheduler; var scheduler = &self.scheduler;
var http_client = self.http_client; var http_client = self.http_client;
// I'd like the page to know NOTHING about extra_socket / CDP, but the
// fact is that the behavior of wait changes depending on whether or
// not we're using CDP.
// If we aren't using CDP, as soon as we think there's nothing left
// to do, we can exit - we'de done.
// But if we are using CDP, we should wait for the whole `wait_ms`
// because the http_click.tick() also monitors the CDP socket. And while
// we could let CDP poll http (like it does for HTTP requests), the fact
// is that we know more about the timing of stuff (e.g. how long to
// poll/sleep) in the page.
const exit_when_done = http_client.extra_socket == null;
// for debugging // for debugging
// defer self.printWaitAnalysis(); // defer self.printWaitAnalysis();
while (true) { while (true) {
SW: switch (self.mode) { switch (self.mode) {
.pre, .raw, .text => { .pre, .raw, .text => {
if (self.request_intercepted) {
// the page request was intercepted.
// there shouldn't be any active requests;
std.debug.assert(http_client.active == 0);
// nothing we can do for this, need to kick the can up
// the chain and wait for activity (e.g. a CDP message)
// to unblock this.
return;
}
// The main page hasn't started/finished navigating. // The main page hasn't started/finished navigating.
// There's no JS to run, and no reason to run the scheduler. // There's no JS to run, and no reason to run the scheduler.
if (http_client.active == 0) { if (http_client.active == 0 and exit_when_done) {
// haven't started navigating, I guess. // haven't started navigating, I guess.
return; return .done;
} }
// There should only be 1 active http transfer, the main page // Either we have active http connections, or we're in CDP
_ = try http_client.tick(.{ .timeout_ms = ms_remaining }); // mode with an extra socket. Either way, we're waiting
// for http traffic
if (try http_client.tick(ms_remaining) == .extra_socket) {
// data on a socket we aren't handling, return to caller
return .extra_socket;
}
}, },
.html, .parsed => { .html, .parsed => {
// The HTML page was parsed. We now either have JS scripts to // The HTML page was parsed. We now either have JS scripts to
// download, or timeouts to execute, or both. // download, or scheduled tasks to execute, or both.
// scheduler.run could trigger new http transfers, so do not // scheduler.run could trigger new http transfers, so do not
// store http_client.active BEFORE this call and then use // store http_client.active BEFORE this call and then use
// it AFTER. // it AFTER.
const ms_to_next_task = try scheduler.runHighPriority(); const ms_to_next_task = try scheduler.runHighPriority();
_ = try scheduler.runLowPriority();
if (try_catch.hasCaught()) { if (try_catch.hasCaught()) {
const msg = (try try_catch.err(self.arena)) orelse "unknown"; const msg = (try try_catch.err(self.arena)) orelse "unknown";
@@ -330,69 +329,64 @@ pub const Page = struct {
return error.JsError; return error.JsError;
} }
if (http_client.active == 0) { if (http_client.active == 0 and exit_when_done) {
if (ms_to_next_task) |ms| { const ms = ms_to_next_task orelse blk: {
// There are no HTTP transfers, so there's no point calling // TODO: when jsRunner is fully replaced with the
// http_client.tick. // htmlRunner, we can remove the first part of this
// TODO: should we just force-run the scheduler?? // condition. jsRunner calls `page.wait` far too
// often to enforce this.
if (ms > ms_remaining) { if (wait_ms > 100 and wait_ms - ms_remaining < 100) {
// we'd wait to long, might as well exit early. // Look, we want to exit ASAP, but we don't want
return; // to exit so fast that we've run none of the
// background jobs.
break :blk 50;
} }
_ = try scheduler.runLowPriority(); // no http transfers, no cdp extra socket, no
// scheduled tasks, we're done.
return .done;
};
// We must use a u64 here b/c ms is a u32 and the if (ms > ms_remaining) {
// conversion to ns can generate an integer // same as above, except we have a scheduled task,
// overflow. // it just happens to be too far into the future.s
const _ms: u64 = @intCast(ms); return .done;
std.Thread.sleep(std.time.ns_per_ms * _ms);
break :SW;
} }
// We have no active http transfer and no pending // we have a task to run in the not-so-distant future.
// schedule tasks. We're done // You might think we can just sleep until that task is
return; // ready, but we should continue to run lowPriority tasks
} // in the meantime, and that could unblock things. So
// we'll just sleep for a bit, and then restart our wait
_ = try scheduler.runLowPriority(); // loop to see what's changed
std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intCast(@min(ms, 20))));
const request_intercepted = self.request_intercepted; } else {
// We're here because we either have active HTTP
// We want to prioritize processing intercepted requests // connections, of exit_when_done == false (aka, there's
// because, the sooner they get unblocked, the sooner we // an extra_socket registered with the http client).
// can start the HTTP request. But we still want to advanced const ms_to_wait = @min(ms_remaining, ms_to_next_task orelse 100);
// existing HTTP requests, if possible. So, if we have if (try http_client.tick(ms_to_wait) == .extra_socket) {
// intercepted requests, we'll still look at existing HTTP // data on a socket we aren't handling, return to caller
// requests, but we won't block waiting for more data. return .extra_socket;
const ms_to_wait = }
if (request_intercepted) 0
// But if we have no intercepted requests, we'll wait
// for as long as we can for data to our existing
// inflight requests
else @min(ms_remaining, ms_to_next_task orelse 1000);
_ = try http_client.tick(.{ .timeout_ms = ms_to_wait });
if (request_intercepted) {
// Again, proritizing intercepted requests. Exit this
// loop so that our caller can hopefully resolve them
// (i.e. continue or abort them);
return;
} }
}, },
.err => |err| { .err => |err| {
self.mode = .{ .raw_done = @errorName(err) }; self.mode = .{ .raw_done = @errorName(err) };
return err; return err;
}, },
.raw_done => return, .raw_done => {
if (exit_when_done) {
return .done;
}
// we _could_ http_client.tick(ms_to_wait), but this has
// the same result, and I feel is more correct.
return .no_page;
}
} }
const ms_elapsed = timer.lap() / 1_000_000; const ms_elapsed = timer.lap() / 1_000_000;
if (ms_elapsed >= ms_remaining) { if (ms_elapsed >= ms_remaining) {
return; return .done;
} }
ms_remaining -= @intCast(ms_elapsed); ms_remaining -= @intCast(ms_elapsed);
} }
@@ -405,48 +399,53 @@ pub const Page = struct {
std.debug.print("\nactive requests: {d}\n", .{self.http_client.active}); std.debug.print("\nactive requests: {d}\n", .{self.http_client.active});
var n_ = self.http_client.handles.in_use.first; var n_ = self.http_client.handles.in_use.first;
while (n_) |n| { while (n_) |n| {
const transfer = Http.Transfer.fromEasy(n.data.conn.easy) catch |err| { const handle: *Http.Client.Handle = @fieldParentPtr("node", n);
const transfer = Http.Transfer.fromEasy(handle.conn.easy) catch |err| {
std.debug.print(" - failed to load transfer: {any}\n", .{err}); std.debug.print(" - failed to load transfer: {any}\n", .{err});
break; break;
}; };
std.debug.print(" - {s}\n", .{transfer}); std.debug.print(" - {f}\n", .{transfer});
n_ = n.next; n_ = n.next;
} }
} }
{ {
std.debug.print("\nqueued requests: {d}\n", .{self.http_client.queue.len}); std.debug.print("\nqueued requests: {d}\n", .{self.http_client.queue.len()});
var n_ = self.http_client.queue.first; var n_ = self.http_client.queue.first;
while (n_) |n| { while (n_) |n| {
std.debug.print(" - {s}\n", .{n.data.url}); const transfer: *Http.Transfer = @fieldParentPtr("_node", n);
std.debug.print(" - {f}\n", .{transfer.uri});
n_ = n.next; n_ = n.next;
} }
} }
{ {
std.debug.print("\nscripts: {d}\n", .{self.script_manager.scripts.len}); std.debug.print("\nscripts: {d}\n", .{self.script_manager.scripts.len()});
var n_ = self.script_manager.scripts.first; var n_ = self.script_manager.scripts.first;
while (n_) |n| { while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete }); const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next; n_ = n.next;
} }
} }
{ {
std.debug.print("\ndeferreds: {d}\n", .{self.script_manager.deferreds.len}); std.debug.print("\ndeferreds: {d}\n", .{self.script_manager.deferreds.len()});
var n_ = self.script_manager.deferreds.first; var n_ = self.script_manager.deferreds.first;
while (n_) |n| { while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete }); const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next; n_ = n.next;
} }
} }
const now = std.time.milliTimestamp(); const now = std.time.milliTimestamp();
{ {
std.debug.print("\nasyncs: {d}\n", .{self.script_manager.asyncs.len}); std.debug.print("\nasyncs: {d}\n", .{self.script_manager.asyncs.len()});
var n_ = self.script_manager.asyncs.first; var n_ = self.script_manager.asyncs.first;
while (n_) |n| { while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete }); const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next; n_ = n.next;
} }
} }

View File

@@ -137,7 +137,13 @@ pub const Session = struct {
return &(self.page orelse return null); return &(self.page orelse return null);
} }
pub fn wait(self: *Session, wait_sec: u16) void { pub const WaitResult = enum {
done,
no_page,
extra_socket,
};
pub fn wait(self: *Session, wait_ms: i32) WaitResult {
if (self.queued_navigation) |qn| { if (self.queued_navigation) |qn| {
// This was already aborted on the page, but it would be pretty // This was already aborted on the page, but it would be pretty
// bad if old requests went to the new page, so let's make double sure // bad if old requests went to the new page, so let's make double sure
@@ -154,21 +160,24 @@ pub const Session = struct {
.err = err, .err = err,
.url = qn.url, .url = qn.url,
}); });
return; return .done;
}; };
page.navigate(qn.url, qn.opts) catch |err| { page.navigate(qn.url, qn.opts) catch |err| {
log.err(.browser, "queued navigation error", .{ .err = err, .url = qn.url }); log.err(.browser, "queued navigation error", .{ .err = err, .url = qn.url });
return; return .done;
}; };
} }
if (self.page) |*page| { if (self.page) |*page| {
page.wait(wait_sec); return page.wait(wait_ms);
} }
return .no_page;
} }
}; };
const QueuedNavigation = struct { const QueuedNavigation = struct {
url: []const u8, url: []const u8,
opts: NavigateOpts, opts: NavigateOpts,

View File

@@ -103,7 +103,6 @@ pub fn CDPT(comptime TypeProvider: type) type {
pub fn handleMessage(self: *Self, msg: []const u8) bool { pub fn handleMessage(self: *Self, msg: []const u8) bool {
// if there's an error, it's already been logged // if there's an error, it's already been logged
self.processMessage(msg) catch return false; self.processMessage(msg) catch return false;
self.pageWait();
return true; return true;
} }
@@ -117,11 +116,13 @@ pub fn CDPT(comptime TypeProvider: type) type {
// A bit hacky right now. The main server loop doesn't unblock for // A bit hacky right now. The main server loop doesn't unblock for
// scheduled task. So we run this directly in order to process any // scheduled task. So we run this directly in order to process any
// timeouts (or http events) which are ready to be processed. // timeouts (or http events) which are ready to be processed.
pub fn pageWait(self: *Self) void {
const session = &(self.browser.session orelse return); pub fn hasPage() bool {
// exits early if there's nothing to do, so a large value like
// 5 seconds should be ok }
session.wait(5); pub fn pageWait(self: *Self, ms: i32) Session.WaitResult {
const session = &(self.browser.session orelse return .no_page);
return session.wait(ms);
} }
// Called from above, in processMessage which handles client messages // Called from above, in processMessage which handles client messages

View File

@@ -182,7 +182,6 @@ pub fn requestIntercept(arena: Allocator, bc: anytype, intercept: *const Notific
// unreachable because we _have_ to have a page. // unreachable because we _have_ to have a page.
const session_id = bc.session_id orelse unreachable; const session_id = bc.session_id orelse unreachable;
const target_id = bc.target_id orelse unreachable; const target_id = bc.target_id orelse unreachable;
const page = bc.session.currentPage() orelse unreachable;
// We keep it around to wait for modifications to the request. // We keep it around to wait for modifications to the request.
// NOTE: we assume whomever created the request created it with a lifetime of the Page. // NOTE: we assume whomever created the request created it with a lifetime of the Page.
@@ -211,7 +210,6 @@ pub fn requestIntercept(arena: Allocator, bc: anytype, intercept: *const Notific
// Await either continueRequest, failRequest or fulfillRequest // Await either continueRequest, failRequest or fulfillRequest
intercept.wait_for_interception.* = true; intercept.wait_for_interception.* = true;
page.request_intercepted = true;
} }
fn continueRequest(cmd: anytype) !void { fn continueRequest(cmd: anytype) !void {
@@ -229,8 +227,6 @@ fn continueRequest(cmd: anytype) !void {
return error.NotImplemented; return error.NotImplemented;
} }
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state; var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId); const request_id = try idFromRequestId(params.requestId);
const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound; const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound;
@@ -266,11 +262,6 @@ fn continueRequest(cmd: anytype) !void {
} }
try bc.cdp.browser.http_client.process(transfer); try bc.cdp.browser.http_client.process(transfer);
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{}); return cmd.sendResult(null, .{});
} }
@@ -292,8 +283,6 @@ fn continueWithAuth(cmd: anytype) !void {
}, },
})) orelse return error.InvalidParams; })) orelse return error.InvalidParams;
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state; var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId); const request_id = try idFromRequestId(params.requestId);
const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound; const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound;
@@ -323,11 +312,6 @@ fn continueWithAuth(cmd: anytype) !void {
transfer.reset(); transfer.reset();
try bc.cdp.browser.http_client.process(transfer); try bc.cdp.browser.http_client.process(transfer);
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{}); return cmd.sendResult(null, .{});
} }
@@ -380,8 +364,6 @@ fn failRequest(cmd: anytype) !void {
errorReason: ErrorReason, errorReason: ErrorReason,
})) orelse return error.InvalidParams; })) orelse return error.InvalidParams;
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state; var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId); const request_id = try idFromRequestId(params.requestId);
@@ -394,10 +376,6 @@ fn failRequest(cmd: anytype) !void {
.url = transfer.uri, .url = transfer.uri,
.reason = params.errorReason, .reason = params.errorReason,
}); });
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{}); return cmd.sendResult(null, .{});
} }
@@ -405,7 +383,6 @@ pub fn requestAuthRequired(arena: Allocator, bc: anytype, intercept: *const Noti
// unreachable because we _have_ to have a page. // unreachable because we _have_ to have a page.
const session_id = bc.session_id orelse unreachable; const session_id = bc.session_id orelse unreachable;
const target_id = bc.target_id orelse unreachable; const target_id = bc.target_id orelse unreachable;
const page = bc.session.currentPage() orelse unreachable;
// We keep it around to wait for modifications to the request. // We keep it around to wait for modifications to the request.
// NOTE: we assume whomever created the request created it with a lifetime of the Page. // NOTE: we assume whomever created the request created it with a lifetime of the Page.
@@ -442,7 +419,6 @@ pub fn requestAuthRequired(arena: Allocator, bc: anytype, intercept: *const Noti
// Await continueWithAuth // Await continueWithAuth
intercept.wait_for_interception.* = true; intercept.wait_for_interception.* = true;
page.request_intercepted = true;
} }
// Get u64 from requestId which is formatted as: "INTERCEPT-{d}" // Get u64 from requestId which is formatted as: "INTERCEPT-{d}"

View File

@@ -164,10 +164,7 @@ pub fn pageNavigate(arena: Allocator, bc: anytype, event: *const Notification.Pa
var cdp = bc.cdp; var cdp = bc.cdp;
if (event.opts.reason != .address_bar) { bc.loader_id = bc.cdp.loader_id_gen.next();
bc.loader_id = bc.cdp.loader_id_gen.next();
}
const loader_id = bc.loader_id; const loader_id = bc.loader_id;
const target_id = bc.target_id orelse unreachable; const target_id = bc.target_id orelse unreachable;
const session_id = bc.session_id orelse unreachable; const session_id = bc.session_id orelse unreachable;

View File

@@ -87,6 +87,11 @@ notification: ?*Notification = null,
// restoring, this originally-configured value is what it goes to. // restoring, this originally-configured value is what it goes to.
http_proxy: ?[:0]const u8 = null, http_proxy: ?[:0]const u8 = null,
// libcurl can monitor arbitrary sockets. Currently, we ever [maybe] want to
// monitor the CDP client socket, so we've done the simplest thing possible
// by having this single optional field
extra_socket: ?posix.socket_t = null,
const TransferQueue = std.DoublyLinkedList; const TransferQueue = std.DoublyLinkedList;
pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts) !*Client { pub fn init(allocator: Allocator, ca_blob: ?c.curl_blob, opts: Http.Opts) !*Client {
@@ -162,11 +167,7 @@ pub fn abort(self: *Client) void {
} }
} }
const TickOpts = struct { pub fn tick(self: *Client, timeout_ms: i32) !PerformStatus {
timeout_ms: i32 = 0,
poll_socket: ?posix.socket_t = null,
};
pub fn tick(self: *Client, opts: TickOpts) !bool {
while (true) { while (true) {
if (self.handles.hasAvailable() == false) { if (self.handles.hasAvailable() == false) {
break; break;
@@ -178,7 +179,7 @@ pub fn tick(self: *Client, opts: TickOpts) !bool {
const handle = self.handles.getFreeHandle().?; const handle = self.handles.getFreeHandle().?;
try self.makeRequest(handle, transfer); try self.makeRequest(handle, transfer);
} }
return self.perform(opts.timeout_ms, opts.poll_socket); return self.perform(timeout_ms);
} }
pub fn request(self: *Client, req: Request) !void { pub fn request(self: *Client, req: Request) !void {
@@ -342,15 +343,25 @@ fn makeRequest(self: *Client, handle: *Handle, transfer: *Transfer) !void {
} }
self.active += 1; self.active += 1;
_ = try self.perform(0, null); _ = try self.perform(0);
} }
fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool { pub const PerformStatus = enum{
extra_socket,
normal,
};
fn perform(self: *Client, timeout_ms: c_int) !PerformStatus {
const multi = self.multi; const multi = self.multi;
var running: c_int = undefined; var running: c_int = undefined;
try errorMCheck(c.curl_multi_perform(multi, &running)); try errorMCheck(c.curl_multi_perform(multi, &running));
if (socket) |s| { // We're potentially going to block for a while until we get data. Process
// whatever messages we have waiting ahead of time.
try self.processMessages();
var status = PerformStatus.normal;
if (self.extra_socket) |s| {
var wait_fd = c.curl_waitfd{ var wait_fd = c.curl_waitfd{
.fd = s, .fd = s,
.events = c.CURL_WAIT_POLLIN, .events = c.CURL_WAIT_POLLIN,
@@ -359,12 +370,18 @@ fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool {
try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null)); try errorMCheck(c.curl_multi_poll(multi, &wait_fd, 1, timeout_ms, null));
if (wait_fd.revents != 0) { if (wait_fd.revents != 0) {
// the extra socket we passed in is ready, let's signal our caller // the extra socket we passed in is ready, let's signal our caller
return true; status = .extra_socket;
} }
} else if (running > 0 and timeout_ms > 0) { } else if (running > 0) {
try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null)); try errorMCheck(c.curl_multi_poll(multi, null, 0, timeout_ms, null));
} }
try self.processMessages();
return status;
}
fn processMessages(self: *Client) !void {
const multi = self.multi;
var messages_count: c_int = 0; var messages_count: c_int = 0;
while (c.curl_multi_info_read(multi, &messages_count)) |msg_| { while (c.curl_multi_info_read(multi, &messages_count)) |msg_| {
const msg: *c.CURLMsg = @ptrCast(msg_); const msg: *c.CURLMsg = @ptrCast(msg_);
@@ -422,8 +439,6 @@ fn perform(self: *Client, timeout_ms: c_int, socket: ?posix.socket_t) !bool {
self.requestFailed(transfer, err); self.requestFailed(transfer, err);
} }
} }
return false;
} }
fn endTransfer(self: *Client, transfer: *Transfer) void { fn endTransfer(self: *Client, transfer: *Transfer) void {
@@ -508,7 +523,7 @@ const Handles = struct {
}; };
// wraps a c.CURL (an easy handle) // wraps a c.CURL (an easy handle)
const Handle = struct { pub const Handle = struct {
client: *Client, client: *Client,
conn: Http.Connection, conn: Http.Connection,
node: Handles.HandleList.Node, node: Handles.HandleList.Node,

View File

@@ -83,16 +83,21 @@ pub fn deinit(self: *Http) void {
self.arena.deinit(); self.arena.deinit();
} }
pub fn poll(self: *Http, timeout_ms: i32, socket: posix.socket_t) bool { pub fn poll(self: *Http, timeout_ms: i32) Client.PerformStatus {
return self.client.tick(.{ return self.client.tick(timeout_ms) catch |err| {
.timeout_ms = timeout_ms,
.poll_socket = socket,
}) catch |err| {
log.err(.app, "http poll", .{ .err = err }); log.err(.app, "http poll", .{ .err = err });
return false; return .normal;
}; };
} }
pub fn monitorSocket(self: *Http, socket: posix.socket_t) void {
self.client.extra_socket = socket;
}
pub fn unmonitorSocket(self: *Http) void {
self.client.extra_socket = null;
}
pub fn newConnection(self: *Http) !Connection { pub fn newConnection(self: *Http) !Connection {
return Connection.init(self.ca_blob, &self.opts); return Connection.init(self.ca_blob, &self.opts);
} }

View File

@@ -137,7 +137,7 @@ fn run(alloc: Allocator) !void {
const server = &_server.?; const server = &_server.?;
defer server.deinit(); defer server.deinit();
server.run(address, opts.timeout) catch |err| { server.run(address, opts.timeout * 1000) catch |err| {
log.fatal(.app, "server run error", .{ .err = err }); log.fatal(.app, "server run error", .{ .err = err });
return err; return err;
}; };
@@ -166,7 +166,7 @@ fn run(alloc: Allocator) !void {
}, },
}; };
session.wait(5); // 5 seconds _ = session.wait(5000); // 5 seconds
// dump // dump
if (opts.dump) { if (opts.dump) {

View File

@@ -109,7 +109,7 @@ fn run(
const url = try std.fmt.allocPrint(arena, "http://localhost:9582/{s}", .{test_file}); const url = try std.fmt.allocPrint(arena, "http://localhost:9582/{s}", .{test_file});
try page.navigate(url, .{}); try page.navigate(url, .{});
page.wait(2); _ = page.wait(2000);
const js_context = page.main_context; const js_context = page.main_context;
var try_catch: Env.TryCatch = undefined; var try_catch: Env.TryCatch = undefined;

View File

@@ -124,44 +124,57 @@ pub const Server = struct {
client.* = try Client.init(socket, self); client.* = try Client.init(socket, self);
defer client.deinit(); defer client.deinit();
var last_message = timestamp();
var http = &self.app.http; var http = &self.app.http;
http.monitorSocket(socket);
defer http.unmonitorSocket();
std.debug.assert(client.mode == .http);
while (true) { while (true) {
if (http.poll(20, socket)) { if (http.poll(timeout_ms) != .extra_socket) {
const n = posix.read(socket, client.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return;
}
const more = client.processData(n) catch false;
if (!more) {
return;
}
last_message = timestamp();
} else if (timestamp() - last_message > timeout_ms) {
log.info(.app, "CDP timeout", .{}); log.info(.app, "CDP timeout", .{});
return; return;
} }
// We have 3 types of "events":
// - Incoming CDP messages
// - Network events from the browser
// - Timeouts from the browser
// The call to http.poll above handles the first two (which is why if (try client.readSocket() == false) {
// we pass the client socket to it). But browser timeouts aren't return;
// hooked into that. So we need to go to the browser page (if there }
// is one), and ask it to process any pending events. That action
// doesn't starve #2 (Network events from the browser), because
// page.wait() handles that too. But it does starve #1 (Incoming CDP
// messages). The good news is that, while the Page is mostly
// unaware of CDP, it will only block if it actually has something to
// do AND it knows if we're waiting on an intercept request, and will
// eagerly return control here in those cases.
if (client.mode == .cdp) { if (client.mode == .cdp) {
client.mode.cdp.pageWait(); break; // switch to our CDP loop
}
}
var cdp = &client.mode.cdp;
var last_message = timestamp();
var ms_remaining = timeout_ms;
while (true) {
switch (cdp.pageWait(ms_remaining)) {
.extra_socket => {
if (try client.readSocket() == false) {
return;
}
last_message = timestamp();
ms_remaining = timeout_ms;
},
.no_page => {
if (http.poll(ms_remaining) != .extra_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (try client.readSocket() == false) {
return;
}
last_message = timestamp();
ms_remaining = timeout_ms;
},
.done => {
const elapsed = timestamp() - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @as(i32, @intCast(elapsed));
},
} }
} }
} }
@@ -218,6 +231,20 @@ pub const Client = struct {
self.send_arena.deinit(); self.send_arena.deinit();
} }
fn readSocket(self: *Client) !bool {
const n = posix.read(self.socket, self.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
return self.processData(n) catch false;
}
fn readBuf(self: *Client) []u8 { fn readBuf(self: *Client) []u8 {
return self.reader.readBuf(); return self.reader.readBuf();
} }

View File

@@ -424,7 +424,7 @@ pub const JsRunner = struct {
} }
return err; return err;
}; };
self.page.session.wait(1); _ = self.page.session.wait(100);
@import("root").js_runner_duration += std.time.Instant.since(try std.time.Instant.now(), start); @import("root").js_runner_duration += std.time.Instant.since(try std.time.Instant.now(), start);
if (case.@"1") |expected| { if (case.@"1") |expected| {
@@ -518,7 +518,7 @@ pub fn htmlRunner(file: []const u8) !void {
const url = try std.fmt.allocPrint(arena_allocator, "http://localhost:9582/src/tests/{s}", .{file}); const url = try std.fmt.allocPrint(arena_allocator, "http://localhost:9582/src/tests/{s}", .{file});
try page.navigate(url, .{}); try page.navigate(url, .{});
page.wait(2); _ = page.wait(2000);
const value = js_context.exec("testing.getStatus()", "testing.getStatus()") catch |err| { const value = js_context.exec("testing.getStatus()", "testing.getStatus()") catch |err| {
const msg = try_catch.err(arena_allocator) catch @errorName(err) orelse "unknown"; const msg = try_catch.err(arena_allocator) catch @errorName(err) orelse "unknown";