Rework page wait again

Further reducing bouncing between page and server for loop polling. If there is
a page, the page polls. If there isn't a page, the server polls. Simpler.
This commit is contained in:
Karl Seguin
2025-09-03 19:37:09 +08:00
parent e237e709b6
commit b6137b03cd
12 changed files with 185 additions and 179 deletions

View File

@@ -44,6 +44,7 @@ pub fn reset(self: *Scheduler) void {
const AddOpts = struct {
name: []const u8 = "",
low_priority: bool = false,
};
pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: AddOpts) !void {
if (ms > 5_000) {
@@ -51,7 +52,9 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, func: Task.Func, ms: u32, opts: Ad
// ignore any task that we're almost certainly never going to run
return;
}
return self.primary.add(.{
var q = if (opts.low_priority) &self.secondary else &self.primary;
return q.add(.{
.ms = std.time.milliTimestamp() + ms,
.ctx = ctx,
.func = func,

View File

@@ -428,7 +428,7 @@ fn errorCallback(ctx: *anyopaque, err: anyerror) void {
// It could be pending because:
// (a) we're still downloading its content or
// (b) this is a non-async script that has to be executed in order
const PendingScript = struct {
pub const PendingScript = struct {
script: Script,
complete: bool,
node: OrderList.Node,

View File

@@ -215,7 +215,11 @@ pub const Window = struct {
}
pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 {
return self.createTimeout(cbk, 5, page, .{ .animation_frame = true, .name = "animationFrame" });
return self.createTimeout(cbk, 5, page, .{
.animation_frame = true,
.name = "animationFrame",
.low_priority = true,
});
}
pub fn _cancelAnimationFrame(self: *Window, id: u32) !void {
@@ -269,6 +273,7 @@ pub const Window = struct {
args: []Env.JsObject = &.{},
repeat: bool = false,
animation_frame: bool = false,
low_priority: bool = false,
};
fn createTimeout(self: *Window, cbk: Function, delay_: ?u32, page: *Page, opts: CreateTimeoutOpts) !u32 {
const delay = delay_ orelse 0;
@@ -319,7 +324,10 @@ pub const Window = struct {
.repeat = if (opts.repeat) delay + 1 else null,
};
try page.scheduler.add(callback, TimerCallback.run, delay, .{ .name = opts.name });
try page.scheduler.add(callback, TimerCallback.run, delay, .{
.name = opts.name,
.low_priority = opts.low_priority,
});
return timer_id;
}

View File

@@ -90,16 +90,6 @@ pub const Page = struct {
load_state: LoadState = .parsing,
// Page.wait balances waiting for resources / tasks and producing an output.
// Up until a timeout, Page.wait will always wait for inflight or pending
// HTTP requests, via the Http.Client.active counter. However, intercepted
// requests (via CDP, but it could be anything), aren't considered "active"
// connection. So it's possible that we have intercepted requests (which are
// pending on some driver to continue/abort) while Http.Client.active == 0.
// This boolean exists to supplment Http.Client.active and inform Page.wait
// of pending connections.
request_intercepted: bool = false,
const Mode = union(enum) {
pre: void,
err: anyerror,
@@ -262,8 +252,9 @@ pub const Page = struct {
return self.script_manager.blockingGet(src);
}
pub fn wait(self: *Page, wait_sec: u16) void {
self._wait(wait_sec) catch |err| switch (err) {
pub fn wait(self: *Page, wait_ms: i32) Session.WaitResult {
return self._wait(wait_ms) catch |err| {
switch (err) {
error.JsError => {}, // already logged (with hopefully more context)
else => {
// There may be errors from the http/client or ScriptManager
@@ -273,12 +264,14 @@ pub const Page = struct {
// specific messages.
log.err(.browser, "page wait", .{ .err = err });
},
}
return .done;
};
}
fn _wait(self: *Page, wait_sec: u16) !void {
fn _wait(self: *Page, wait_ms: i32) !Session.WaitResult {
var timer = try std.time.Timer.start();
var ms_remaining: i32 = @intCast(wait_sec * 1000);
var ms_remaining = wait_ms;
var try_catch: Env.TryCatch = undefined;
try_catch.init(self.main_context);
@@ -287,40 +280,42 @@ pub const Page = struct {
var scheduler = &self.scheduler;
var http_client = self.http_client;
// I'd like the page to know NOTHING about extra_socket / CDP, but the
// fact is that the behavior of wait changes depending on whether or
// not we're using CDP.
// If we aren't using CDP, as soon as we think there's nothing left
// to do, we can exit - we'de done.
// But if we are using CDP, we should wait for the whole `wait_ms`
// because the http_click.tick() also monitors the CDP socket. And while
// we could let CDP poll http (like it does for HTTP requests), the fact
// is that we know more about the timing of stuff (e.g. how long to
// poll/sleep) in the page.
const exit_when_done = http_client.extra_socket == null;
// for debugging
// defer self.printWaitAnalysis();
while (true) {
SW: switch (self.mode) {
switch (self.mode) {
.pre, .raw, .text => {
if (self.request_intercepted) {
// the page request was intercepted.
// there shouldn't be any active requests;
std.debug.assert(http_client.active == 0);
// nothing we can do for this, need to kick the can up
// the chain and wait for activity (e.g. a CDP message)
// to unblock this.
return;
}
// The main page hasn't started/finished navigating.
// There's no JS to run, and no reason to run the scheduler.
if (http_client.active == 0) {
if (http_client.active == 0 and exit_when_done) {
// haven't started navigating, I guess.
return;
return .done;
}
// There should only be 1 active http transfer, the main page
// Either we have active http connections, or we're in CDP
// mode with an extra socket. Either way, we're waiting
// for http traffic
if (try http_client.tick(ms_remaining) == .extra_socket) {
// data on a socket we aren't handling, return to caller
return;
return .extra_socket;
}
},
.html, .parsed => {
// The HTML page was parsed. We now either have JS scripts to
// download, or timeouts to execute, or both.
// download, or scheduled tasks to execute, or both.
// scheduler.run could trigger new http transfers, so do not
// store http_client.active BEFORE this call and then use
@@ -333,72 +328,56 @@ pub const Page = struct {
return error.JsError;
}
if (http_client.active == 0) {
if (ms_to_next_task) |ms| {
// There are no HTTP transfers, so there's no point calling
// http_client.tick.
// TODO: should we just force-run the scheduler??
if (http_client.active == 0 and exit_when_done) {
const ms = ms_to_next_task orelse {
// no http transfers, no cdp extra socket, no
// scheduled tasks, we're done.
return .done;
};
if (ms > ms_remaining) {
// we'd wait to long, might as well exit early.
return;
}
_ = try scheduler.runLowPriority();
// We must use a u64 here b/c ms is a u32 and the
// conversion to ns can generate an integer
// overflow.
const _ms: u64 = @intCast(ms);
std.Thread.sleep(std.time.ns_per_ms * _ms);
break :SW;
}
// We have no active http transfer and no pending
// schedule tasks. We're done
return;
// same as above, except we have a scheduled task,
// it just happens to be too far into the future.s
return .done;
}
_ = try scheduler.runLowPriority();
const request_intercepted = self.request_intercepted;
// We want to prioritize processing intercepted requests
// because, the sooner they get unblocked, the sooner we
// can start the HTTP request. But we still want to advanced
// existing HTTP requests, if possible. So, if we have
// intercepted requests, we'll still look at existing HTTP
// requests, but we won't block waiting for more data.
const ms_to_wait =
if (request_intercepted) 0
// But if we have no intercepted requests, we'll wait
// for as long as we can for data to our existing
// inflight requests
else @min(ms_remaining, ms_to_next_task orelse 1000);
// we have a task to run in the not-so-distant future.
// You might think we can just sleep until that task is
// ready, but we should continue to run lowPriority tasks
// in the meantime, and that could unblock things. So
// we'll just sleep for a bit, and then restart our wait
// loop to see what's changed
std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intCast(@min(ms, 20))));
} else {
// We're here because we either have active HTTP
// connections, of exit_when_done == false (aka, there's
// an extra_socket registered with the http client).
_ = try scheduler.runLowPriority();
const ms_to_wait = @min(ms_remaining, ms_to_next_task orelse 100);
if (try http_client.tick(ms_to_wait) == .extra_socket) {
// data on a socket we aren't handling, return to caller
return;
return .extra_socket;
}
if (request_intercepted) {
// Again, proritizing intercepted requests. Exit this
// loop so that our caller can hopefully resolve them
// (i.e. continue or abort them);
return;
}
},
.err => |err| {
self.mode = .{ .raw_done = @errorName(err) };
return err;
},
.raw_done => return,
.raw_done => {
if (exit_when_done) {
return .done;
}
// we _could_ http_client.tick(ms_to_wait), but this has
// the same result, and I feel is more correct.
return .no_page;
}
}
const ms_elapsed = timer.lap() / 1_000_000;
if (ms_elapsed >= ms_remaining) {
return;
return .done;
}
ms_remaining -= @intCast(ms_elapsed);
}
@@ -411,48 +390,53 @@ pub const Page = struct {
std.debug.print("\nactive requests: {d}\n", .{self.http_client.active});
var n_ = self.http_client.handles.in_use.first;
while (n_) |n| {
const transfer = Http.Transfer.fromEasy(n.data.conn.easy) catch |err| {
const handle: *Http.Client.Handle = @fieldParentPtr("node", n);
const transfer = Http.Transfer.fromEasy(handle.conn.easy) catch |err| {
std.debug.print(" - failed to load transfer: {any}\n", .{err});
break;
};
std.debug.print(" - {s}\n", .{transfer});
std.debug.print(" - {f}\n", .{transfer});
n_ = n.next;
}
}
{
std.debug.print("\nqueued requests: {d}\n", .{self.http_client.queue.len});
std.debug.print("\nqueued requests: {d}\n", .{self.http_client.queue.len()});
var n_ = self.http_client.queue.first;
while (n_) |n| {
std.debug.print(" - {s}\n", .{n.data.url});
const transfer: *Http.Transfer = @fieldParentPtr("_node", n);
std.debug.print(" - {f}\n", .{transfer.uri});
n_ = n.next;
}
}
{
std.debug.print("\nscripts: {d}\n", .{self.script_manager.scripts.len});
std.debug.print("\nscripts: {d}\n", .{self.script_manager.scripts.len()});
var n_ = self.script_manager.scripts.first;
while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete });
const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next;
}
}
{
std.debug.print("\ndeferreds: {d}\n", .{self.script_manager.deferreds.len});
std.debug.print("\ndeferreds: {d}\n", .{self.script_manager.deferreds.len()});
var n_ = self.script_manager.deferreds.first;
while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete });
const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next;
}
}
const now = std.time.milliTimestamp();
{
std.debug.print("\nasyncs: {d}\n", .{self.script_manager.asyncs.len});
std.debug.print("\nasyncs: {d}\n", .{self.script_manager.asyncs.len()});
var n_ = self.script_manager.asyncs.first;
while (n_) |n| {
std.debug.print(" - {s} complete: {any}\n", .{ n.data.script.url, n.data.complete });
const ps: *ScriptManager.PendingScript = @fieldParentPtr("node", n);
std.debug.print(" - {s} complete: {any}\n", .{ ps.script.url, ps.complete });
n_ = n.next;
}
}

View File

@@ -137,7 +137,13 @@ pub const Session = struct {
return &(self.page orelse return null);
}
pub fn wait(self: *Session, wait_sec: u16) void {
pub const WaitResult = enum {
done,
no_page,
extra_socket,
};
pub fn wait(self: *Session, wait_ms: i32) WaitResult {
if (self.queued_navigation) |qn| {
// This was already aborted on the page, but it would be pretty
// bad if old requests went to the new page, so let's make double sure
@@ -154,21 +160,24 @@ pub const Session = struct {
.err = err,
.url = qn.url,
});
return;
return .done;
};
page.navigate(qn.url, qn.opts) catch |err| {
log.err(.browser, "queued navigation error", .{ .err = err, .url = qn.url });
return;
return .done;
};
}
if (self.page) |*page| {
page.wait(wait_sec);
return page.wait(wait_ms);
}
return .no_page;
}
};
const QueuedNavigation = struct {
url: []const u8,
opts: NavigateOpts,

View File

@@ -116,11 +116,13 @@ pub fn CDPT(comptime TypeProvider: type) type {
// A bit hacky right now. The main server loop doesn't unblock for
// scheduled task. So we run this directly in order to process any
// timeouts (or http events) which are ready to be processed.
pub fn pageWait(self: *Self) void {
const session = &(self.browser.session orelse return);
// exits early if there's nothing to do, so a large value like
// 5 seconds should be ok
session.wait(5);
pub fn hasPage() bool {
}
pub fn pageWait(self: *Self, ms: i32) Session.WaitResult {
const session = &(self.browser.session orelse return .no_page);
return session.wait(ms);
}
// Called from above, in processMessage which handles client messages

View File

@@ -182,7 +182,6 @@ pub fn requestIntercept(arena: Allocator, bc: anytype, intercept: *const Notific
// unreachable because we _have_ to have a page.
const session_id = bc.session_id orelse unreachable;
const target_id = bc.target_id orelse unreachable;
const page = bc.session.currentPage() orelse unreachable;
// We keep it around to wait for modifications to the request.
// NOTE: we assume whomever created the request created it with a lifetime of the Page.
@@ -211,7 +210,6 @@ pub fn requestIntercept(arena: Allocator, bc: anytype, intercept: *const Notific
// Await either continueRequest, failRequest or fulfillRequest
intercept.wait_for_interception.* = true;
page.request_intercepted = true;
}
fn continueRequest(cmd: anytype) !void {
@@ -229,8 +227,6 @@ fn continueRequest(cmd: anytype) !void {
return error.NotImplemented;
}
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId);
const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound;
@@ -266,11 +262,6 @@ fn continueRequest(cmd: anytype) !void {
}
try bc.cdp.browser.http_client.process(transfer);
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{});
}
@@ -292,8 +283,6 @@ fn continueWithAuth(cmd: anytype) !void {
},
})) orelse return error.InvalidParams;
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId);
const transfer = intercept_state.remove(request_id) orelse return error.RequestNotFound;
@@ -323,11 +312,6 @@ fn continueWithAuth(cmd: anytype) !void {
transfer.reset();
try bc.cdp.browser.http_client.process(transfer);
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{});
}
@@ -380,8 +364,6 @@ fn failRequest(cmd: anytype) !void {
errorReason: ErrorReason,
})) orelse return error.InvalidParams;
const page = bc.session.currentPage() orelse return error.PageNotLoaded;
var intercept_state = &bc.intercept_state;
const request_id = try idFromRequestId(params.requestId);
@@ -394,10 +376,6 @@ fn failRequest(cmd: anytype) !void {
.url = transfer.uri,
.reason = params.errorReason,
});
if (intercept_state.empty()) {
page.request_intercepted = false;
}
return cmd.sendResult(null, .{});
}
@@ -405,7 +383,6 @@ pub fn requestAuthRequired(arena: Allocator, bc: anytype, intercept: *const Noti
// unreachable because we _have_ to have a page.
const session_id = bc.session_id orelse unreachable;
const target_id = bc.target_id orelse unreachable;
const page = bc.session.currentPage() orelse unreachable;
// We keep it around to wait for modifications to the request.
// NOTE: we assume whomever created the request created it with a lifetime of the Page.
@@ -442,7 +419,6 @@ pub fn requestAuthRequired(arena: Allocator, bc: anytype, intercept: *const Noti
// Await continueWithAuth
intercept.wait_for_interception.* = true;
page.request_intercepted = true;
}
// Get u64 from requestId which is formatted as: "INTERCEPT-{d}"

View File

@@ -523,7 +523,7 @@ const Handles = struct {
};
// wraps a c.CURL (an easy handle)
const Handle = struct {
pub const Handle = struct {
client: *Client,
conn: Http.Connection,
node: Handles.HandleList.Node,

View File

@@ -137,7 +137,7 @@ fn run(alloc: Allocator) !void {
const server = &_server.?;
defer server.deinit();
server.run(address, opts.timeout) catch |err| {
server.run(address, opts.timeout * 1000) catch |err| {
log.fatal(.app, "server run error", .{ .err = err });
return err;
};
@@ -166,7 +166,7 @@ fn run(alloc: Allocator) !void {
},
};
session.wait(5); // 5 seconds
_ = session.wait(5000); // 5 seconds
// dump
if (opts.dump) {

View File

@@ -109,7 +109,7 @@ fn run(
const url = try std.fmt.allocPrint(arena, "http://localhost:9582/{s}", .{test_file});
try page.navigate(url, .{});
page.wait(2);
_ = page.wait(2000);
const js_context = page.main_context;
var try_catch: Env.TryCatch = undefined;

View File

@@ -124,48 +124,58 @@ pub const Server = struct {
client.* = try Client.init(socket, self);
defer client.deinit();
var last_message = timestamp();
var http = &self.app.http;
http.monitorSocket(socket);
defer http.unmonitorSocket();
std.debug.assert(client.mode == .http);
while (true) {
if (http.poll(10) == .extra_socket) {
const n = posix.read(socket, client.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return;
}
const more = client.processData(n) catch false;
if (!more) {
return;
}
last_message = timestamp();
} else if (timestamp() - last_message > timeout_ms) {
if (http.poll(timeout_ms) != .extra_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
// We have 3 types of "events":
// - Incoming CDP messages
// - Network events from the browser
// - Timeouts from the browser
// The call to http.poll above handles the first two (which is why
// we pass the client socket to it). But browser timeouts aren't
// hooked into that. So we need to go to the browser page (if there
// is one), and ask it to process any pending events. That action
// doesn't starve #2 (Network events from the browser), because
// page.wait() handles that too. But it does starve #1 (Incoming CDP
// messages). The good news is that, while the Page is mostly
// unaware of CDP, it will only block if it actually has something to
// do AND it knows if we're waiting on an intercept request, and will
// eagerly return control here in those cases.
if (try client.readSocket() == false) {
return;
}
if (client.mode == .cdp) {
client.mode.cdp.pageWait();
break; // switch to our CDP loop
}
}
var cdp = &client.mode.cdp;
var last_message = timestamp();
var ms_remaining = timeout_ms;
while (true) {
switch (cdp.pageWait(ms_remaining)) {
.extra_socket => {
if (try client.readSocket() == false) {
return;
}
last_message = timestamp();
ms_remaining = timeout_ms;
},
.no_page => {
if (http.poll(ms_remaining) != .extra_socket) {
log.info(.app, "CDP timeout", .{});
return;
}
if (try client.readSocket() == false) {
return;
}
last_message = timestamp();
ms_remaining = timeout_ms;
},
.done => {
std.debug.print("ok\n", .{});
const elapsed = timestamp() - last_message;
if (elapsed > ms_remaining) {
log.info(.app, "CDP timeout", .{});
return;
}
ms_remaining -= @as(i32, @intCast(elapsed));
},
}
}
}
@@ -222,6 +232,20 @@ pub const Client = struct {
self.send_arena.deinit();
}
fn readSocket(self: *Client) !bool {
const n = posix.read(self.socket, self.readBuf()) catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
}
return self.processData(n) catch false;
}
fn readBuf(self: *Client) []u8 {
return self.reader.readBuf();
}

View File

@@ -424,7 +424,7 @@ pub const JsRunner = struct {
}
return err;
};
self.page.session.wait(1);
self.page.session.wait(1000);
@import("root").js_runner_duration += std.time.Instant.since(try std.time.Instant.now(), start);
if (case.@"1") |expected| {
@@ -518,7 +518,7 @@ pub fn htmlRunner(file: []const u8) !void {
const url = try std.fmt.allocPrint(arena_allocator, "http://localhost:9582/src/tests/{s}", .{file});
try page.navigate(url, .{});
page.wait(2);
_ = page.wait(2000);
const value = js_context.exec("testing.getStatus()", "testing.getStatus()") catch |err| {
const msg = try_catch.err(arena_allocator) catch @errorName(err) orelse "unknown";