From 3a5aa87853335f73c29ad407b33c8e1c285a77ee Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 5 Jun 2025 12:32:09 +0800 Subject: [PATCH 1/4] Optimize the lifecycle of async requests Async HTTP request work by emitting a "Progress" object to a callback. This object has a "done" flag which, when `true`, indicates that all data has been emitting and no future "Progress" objects will be sent. Callers like XHR buffer the response and wait for "done = true" to then process the request. The HTTP client relies on two important object pools: the connection and the state (with all the buffers for reading/writing). In its current implementation, the async flow does not release these pooled objects until the final callback has returned. At best, this is inefficient: we're keeping the connection and state objects checked out for longer than they have to be. At worse, it can lead to a deadlock. If the calling code issues a new request when done == true, we'll eventually run out of state objects in the pool. This commit now releases the state objects before emit the final "done" Progress message. For this to work, this final message will always have null data and an empty header object. --- src/browser/xhr/xhr.zig | 3 +-- src/http/client.zig | 40 ++++++++++++++++++++++++++++++++++------ 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/browser/xhr/xhr.zig b/src/browser/xhr/xhr.zig index d65f9db8..3f1dfc2a 100644 --- a/src/browser/xhr/xhr.zig +++ b/src/browser/xhr/xhr.zig @@ -468,7 +468,6 @@ pub const XMLHttpRequest = struct { if (progress.first) { const header = progress.header; - log.debug(.http, "request header", .{ .source = "xhr", .url = self.url, @@ -522,7 +521,7 @@ pub const XMLHttpRequest = struct { log.info(.http, "request complete", .{ .source = "xhr", .url = self.url, - .status = progress.header.status, + .status = self.response_status, }); // Not that the request is done, the http/client will free the request diff --git a/src/http/client.zig b/src/http/client.zig index 81b11cd7..968402b7 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -828,6 +828,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { wait, done, need_more, + handler_error, }; fn deinit(self: *Self) void { @@ -966,12 +967,35 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { switch (status) { .wait => {}, .need_more => self.receive(), + .handler_error => { + // handler should never have been called if we're redirecting + std.debug.assert(self.redirect == null); + self.request.requestCompleted(self.reader.response); + self.deinit(); + return; + }, .done => { const redirect = self.redirect orelse { + var handler = self.handler; self.request.requestCompleted(self.reader.response); self.deinit(); + + // Emit the done chunk. We expect the caller to do + // processing once the full request is completed. By + // emiting this AFTER we've relreased the connection, + // we free the connection and its state for re-use. + // If we don't do this this way, we can end up with + // _a lot_ of pending request/states. + // DO NOT USE `self` here, it's no longer valid. + handler.onHttpResponse(.{ + .data = null, + .done = true, + .first = false, + .header = .{}, + }) catch {}; return; }; + self.request.redirectAsync(redirect, self.loop, self.handler) catch |err| { self.handleError("Setup async redirect", err); return; @@ -1116,22 +1140,22 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { self.handler.onHttpResponse(.{ .data = chunk, .first = first, - .done = next == null, + .done = false, .header = reader.response, - }) catch return .done; + }) catch return .handler_error; first = false; } } - } else if (result.data != null or done or would_be_first) { + } else if (result.data != null or would_be_first) { // If we have data. Or if the request is done. Or if this is the // first time we have a complete header. Emit the chunk. self.handler.onHttpResponse(.{ - .done = done, + .done = false, .data = result.data, .first = would_be_first, .header = reader.response, - }) catch return .done; + }) catch return .handler_error; } if (done == true) { @@ -3135,7 +3159,8 @@ const CaptureHandler = struct { const progress = try progress_; const allocator = self.response.arena.allocator(); try self.response.body.appendSlice(allocator, progress.data orelse ""); - if (progress.done) { + if (progress.first) { + std.debug.assert(!progress.done); self.response.status = progress.header.status; try self.response.headers.ensureTotalCapacity(allocator, progress.header.headers.items.len); for (progress.header.headers.items) |header| { @@ -3144,6 +3169,9 @@ const CaptureHandler = struct { .value = try allocator.dupe(u8, header.value), }); } + } + + if (progress.done) { self.reset.set(); } } From 48c25c380dde1b3d6d436ddd513d4ef386de0920 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 5 Jun 2025 20:07:51 +0800 Subject: [PATCH 2/4] Removing blocking code async HTTP request The HTTP Client has a state pool. It blocks when we've exceeded max_concurrency. This can block processing forever. A simple way to reproduce this is to go into the demo cdp.js, and execute the XHR request 5 times (loading json/product.json) To some degree, I think this is a result of weird / non-intuitive execution flow. If you exec a JS with 100 XHR requests, it'll call our XHR _send function but none of these will execute until the loop is run (after the script is done being executed). This can result in poor utilization of our connection and state pool. For an async request, getting the *Request object is itself now asynchronous. If no state is available, we use the Loop's timeout (at 20ms) to keep checking for an available state. --- src/app.zig | 3 +- src/browser/page.zig | 4 +- src/browser/xhr/xhr.zig | 28 +++-- src/http/client.zig | 219 +++++++++++++++++++++++++++++++++------- 4 files changed, 206 insertions(+), 48 deletions(-) diff --git a/src/app.zig b/src/app.zig index 2501d832..803c55f0 100644 --- a/src/app.zig +++ b/src/app.zig @@ -52,7 +52,8 @@ pub const App = struct { .telemetry = undefined, .app_dir_path = app_dir_path, .notification = notification, - .http_client = try HttpClient.init(allocator, 5, .{ + .http_client = try HttpClient.init(allocator, .{ + .max_concurrent = 3, .http_proxy = config.http_proxy, .tls_verify_host = config.tls_verify_host, }), diff --git a/src/browser/page.zig b/src/browser/page.zig index 83f1136b..9514ef0d 100644 --- a/src/browser/page.zig +++ b/src/browser/page.zig @@ -113,7 +113,9 @@ pub const Page = struct { .cookie_jar = &session.cookie_jar, .microtask_node = .{ .func = microtaskCallback }, .window_clicked_event_node = .{ .func = windowClicked }, - .request_factory = browser.http_client.requestFactory(browser.notification), + .request_factory = browser.http_client.requestFactory(.{ + .notification = browser.notification, + }), .scope = undefined, .module_map = .empty, }; diff --git a/src/browser/xhr/xhr.zig b/src/browser/xhr/xhr.zig index 3f1dfc2a..21b12ce7 100644 --- a/src/browser/xhr/xhr.zig +++ b/src/browser/xhr/xhr.zig @@ -30,6 +30,7 @@ const Mime = @import("../mime.zig").Mime; const parser = @import("../netsurf.zig"); const http = @import("../../http/client.zig"); const Page = @import("../page.zig").Page; +const Loop = @import("../../runtime/loop.zig").Loop; const CookieJar = @import("../storage/storage.zig").CookieJar; // XHR interfaces @@ -78,6 +79,7 @@ const XMLHttpRequestBodyInit = union(enum) { pub const XMLHttpRequest = struct { proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{}, + loop: *Loop, arena: Allocator, request: ?*http.Request = null, @@ -91,6 +93,7 @@ pub const XMLHttpRequest = struct { sync: bool = true, err: ?anyerror = null, last_dispatch: i64 = 0, + request_body: ?[]const u8 = null, cookie_jar: *CookieJar, // the URI of the page where this request is originating from @@ -241,12 +244,13 @@ pub const XMLHttpRequest = struct { pub fn constructor(page: *Page) !XMLHttpRequest { const arena = page.arena; return .{ + .url = null, .arena = arena, + .loop = page.loop, .headers = Headers.init(arena), .response_headers = Headers.init(arena), .method = undefined, .state = .unsent, - .url = null, .origin_url = &page.url, .cookie_jar = page.cookie_jar, }; @@ -422,10 +426,16 @@ pub const XMLHttpRequest = struct { log.debug(.http, "request", .{ .method = self.method, .url = self.url, .source = "xhr" }); self.send_flag = true; + if (body) |b| { + self.request_body = try self.arena.dupe(u8, b); + } - self.request = try page.request_factory.create(self.method, &self.url.?.uri); - var request = self.request.?; - errdefer request.deinit(); + try page.request_factory.initAsync(page.arena, self.method, &self.url.?.uri, self, onHttpRequestReady, self.loop,); + } + + fn onHttpRequestReady(ctx: *anyopaque, request: *http.Request) !void { + // on error, our caller will cleanup request + const self: *XMLHttpRequest = @alignCast(@ptrCast(ctx)); for (self.headers.list.items) |hdr| { try request.addHeader(hdr.name, hdr.value, .{}); @@ -433,7 +443,7 @@ pub const XMLHttpRequest = struct { { var arr: std.ArrayListUnmanaged(u8) = .{}; - try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(page.arena), .{ + try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(self.arena), .{ .navigation = false, .origin_uri = &self.origin_url.uri, }); @@ -447,14 +457,15 @@ pub const XMLHttpRequest = struct { // if the request method is GET or HEAD. // https://xhr.spec.whatwg.org/#the-send()-method // var used_body: ?XMLHttpRequestBodyInit = null; - if (body) |b| { + if (self.request_body) |b| { if (self.method != .GET and self.method != .HEAD) { - request.body = try page.arena.dupe(u8, b); + request.body = b; try request.addHeader("Content-Type", "text/plain; charset=UTF-8", .{}); } } - try request.sendAsync(page.loop, self, .{}); + try request.sendAsync(self.loop, self, .{}); + self.request = request; } pub fn onHttpResponse(self: *XMLHttpRequest, progress_: anyerror!http.Progress) !void { @@ -468,6 +479,7 @@ pub const XMLHttpRequest = struct { if (progress.first) { const header = progress.header; + log.debug(.http, "request header", .{ .source = "xhr", .url = self.url, diff --git a/src/http/client.zig b/src/http/client.zig index 968402b7..e336e49e 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -54,16 +54,17 @@ pub const Client = struct { request_pool: std.heap.MemoryPool(Request), const Opts = struct { - tls_verify_host: bool = true, + max_concurrent: usize = 3, http_proxy: ?std.Uri = null, + tls_verify_host: bool = true, max_idle_connection: usize = 10, }; - pub fn init(allocator: Allocator, max_concurrent: usize, opts: Opts) !Client { + pub fn init(allocator: Allocator, opts: Opts) !Client { var root_ca: tls.config.CertBundle = if (builtin.is_test) .{} else try tls.config.CertBundle.fromSystem(allocator); errdefer root_ca.deinit(allocator); - const state_pool = try StatePool.init(allocator, max_concurrent); + const state_pool = try StatePool.init(allocator, opts.max_concurrent); errdefer state_pool.deinit(allocator); const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection); @@ -92,13 +93,62 @@ pub const Client = struct { } pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !*Request { - const state = self.state_pool.acquire(); + const state = self.state_pool.acquireWait(); + errdefer self.state_pool.release(state); - errdefer { - state.reset(); - self.state_pool.release(state); + const req = try self.request_pool.create(); + errdefer self.request_pool.destroy(req); + + req.* = try Request.init(self, state, method, uri); + return req; + } + + pub fn initAsync( + self: *Client, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + opts: RequestOpts, + ) !void { + if (self.state_pool.acquireOrNull()) |state| { + // if we have state ready, we can skip the loop and immediately + // kick this request off. + return self.asyncRequestReady(method, uri, ctx, callback, state, opts); } + // This cannot be a client-owned MemoryPool. The page can end before + // this is ever completed (and the check callback will never be called). + // As long as the loop doesn't guarantee that callbacks will be called, + // this _has_ to be the page arena. + const queue = try arena.create(AsyncQueue); + queue.* = .{ + .ctx = ctx, + .uri = uri, + .opts = opts, + .client = self, + .method = method, + .callback = callback, + .node = .{ .func = AsyncQueue.check }, + }; + _ = try loop.timeout(10 * std.time.ns_per_ms, &queue.node); + } + + // Either called directly from initAsync (if we have a state ready) + // Or from when the AsyncQueue(T) is ready. + fn asyncRequestReady( + self: *Client, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + state: *State, + opts: RequestOpts, + ) !void { + errdefer self.state_pool.release(state); + // We need the request on the heap, because it can have a longer lifetime // than the code making the request. That sounds odd, but consider the // case of an XHR request: it can still be inflight (e.g. waiting for @@ -110,26 +160,78 @@ pub const Client = struct { errdefer self.request_pool.destroy(req); req.* = try Request.init(self, state, method, uri); - return req; + if (opts.notification) |notification| { + req.notification = notification; + } + + errdefer req.deinit(); + try callback(ctx, req); } - pub fn requestFactory(self: *Client, notification: ?*Notification) RequestFactory { + pub fn requestFactory(self: *Client, opts: RequestOpts) RequestFactory { return .{ + .opts = opts, .client = self, - .notification = notification, }; } }; +const RequestOpts = struct { + notification: ?*Notification = null, +}; + // A factory for creating requests with a given set of options. pub const RequestFactory = struct { client: *Client, - notification: ?*Notification, + opts: RequestOpts, - pub fn create(self: RequestFactory, method: Request.Method, uri: *const Uri) !*Request { - var req = try self.client.request(method, uri); - req.notification = self.notification; - return req; + pub fn initAsync( + self: RequestFactory, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + ) !void { + return self.client.initAsync(arena, method, uri, ctx, callback, loop, self.opts); + } +}; + +const AsyncQueue = struct { + ctx: *anyopaque, + method: Request.Method, + uri: *const Uri, + client: *Client, + opts: RequestOpts, + node: Loop.CallbackNode, + callback: Callback, + + const Callback = *const fn (*anyopaque, *Request) anyerror!void; + + fn check(node: *Loop.CallbackNode, repeat_delay: *?u63) void { + const self: *AsyncQueue = @fieldParentPtr("node", node); + self._check(repeat_delay) catch |err| { + log.err(.http_client, "async queue check", .{ .err = err }); + }; + } + + fn _check(self: *AsyncQueue, repeat_delay: *?u63) !void { + const client = self.client; + const state = client.state_pool.acquireOrNull() orelse { + // re-run this function in 10 milliseconds + repeat_delay.* = 10 * std.time.ns_per_ms; + return; + }; + + try client.asyncRequestReady( + self.method, + self.uri, + self.ctx, + self.callback, + state, + self.opts, + ); } }; @@ -321,7 +423,6 @@ pub const Request = struct { pub fn deinit(self: *Request) void { self.releaseConnection(); - _ = self._state.reset(); self._client.state_pool.release(self._state); self._client.request_pool.destroy(self); } @@ -1137,6 +1238,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { self.handleError("decompression error", err); return .done; }; + self.handler.onHttpResponse(.{ .data = chunk, .first = first, @@ -2346,7 +2448,7 @@ const State = struct { } fn reset(self: *State) void { - _ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 }); + _ = self.arena.reset(.{ .retain_with_limit = 64 * 1024 }); } fn deinit(self: *State) void { @@ -2399,10 +2501,11 @@ const StatePool = struct { allocator.free(self.states); } - pub fn acquire(self: *StatePool) *State { + pub fn acquireWait(self: *StatePool) *State { + const states = self.states; + self.mutex.lock(); while (true) { - const states = self.states; const available = self.available; if (available == 0) { self.cond.wait(&self.mutex); @@ -2416,7 +2519,25 @@ const StatePool = struct { } } + pub fn acquireOrNull(self: *StatePool) ?*State { + const states = self.states; + + self.mutex.lock(); + defer self.mutex.unlock(); + + const available = self.available; + if (available == 0) { + return null; + } + + const index = available - 1; + const state = states[index]; + self.available = index; + return state; + } + pub fn release(self: *StatePool, state: *State) void { + state.reset(); self.mutex.lock(); var states = self.states; const available = self.available; @@ -2823,11 +2944,19 @@ test "HttpClient: sync GET redirect" { } test "HttpClient: async connect error" { + defer testing.reset(); var loop = try Loop.init(testing.allocator); defer loop.deinit(); const Handler = struct { + loop: *Loop, reset: *Thread.ResetEvent, + + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *@This() = @alignCast(@ptrCast(ctx)); + try req.sendAsync(self.loop, self, .{}); + } + fn onHttpResponse(self: *@This(), res: anyerror!Progress) !void { _ = res catch |err| { if (err == error.ConnectionRefused) { @@ -2845,14 +2974,21 @@ test "HttpClient: async connect error" { var client = try testClient(); defer client.deinit(); + var handler = Handler{ + .loop = &loop, + .reset = &reset, + }; + const uri = try Uri.parse("HTTP://127.0.0.1:9920"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&loop, Handler{ .reset = &reset }, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, Handler.requestReady, &loop, .{}, ); + try loop.io.run_for_ns(std.time.ns_per_ms); try reset.timedWait(std.time.ns_per_s); } test "HttpClient: async no body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2860,8 +2996,7 @@ test "HttpClient: async no body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2871,6 +3006,8 @@ test "HttpClient: async no body" { } test "HttpClient: async with body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2878,8 +3015,7 @@ test "HttpClient: async with body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/echo"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2894,6 +3030,8 @@ test "HttpClient: async with body" { } test "HttpClient: async with gzip body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2901,8 +3039,7 @@ test "HttpClient: async with gzip body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/gzip"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2916,6 +3053,8 @@ test "HttpClient: async with gzip body" { } test "HttpClient: async redirect" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2923,8 +3062,7 @@ test "HttpClient: async redirect" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/redirect"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); // Called twice on purpose. The initial GET resutls in the # of pending // events to reach 0. This causes our `run_for_ns` to return. But we then @@ -2945,6 +3083,7 @@ test "HttpClient: async redirect" { } test "HttpClient: async tls no body" { + defer testing.reset(); var client = try testClient(); defer client.deinit(); for (0..5) |_| { @@ -2952,8 +3091,7 @@ test "HttpClient: async tls no body" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2969,6 +3107,7 @@ test "HttpClient: async tls no body" { } test "HttpClient: async tls with body x" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -2977,8 +3116,7 @@ test "HttpClient: async tls with body x" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/body"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2993,6 +3131,7 @@ test "HttpClient: async tls with body x" { } test "HttpClient: async redirect from TLS to Plaintext" { + defer testing.reset(); for (0..1) |_| { var client = try testClient(); defer client.deinit(); @@ -3001,8 +3140,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { defer handler.deinit(); const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -3018,6 +3156,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { } test "HttpClient: async redirect plaintext to TLS" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -3026,8 +3165,7 @@ test "HttpClient: async redirect plaintext to TLS" { defer handler.deinit(); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -3149,6 +3287,11 @@ const CaptureHandler = struct { self.loop.deinit(); } + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *CaptureHandler = @alignCast(@ptrCast(ctx)); + try req.sendAsync(&self.loop, self, .{ .tls_verify_host = false }); + } + fn onHttpResponse(self: *CaptureHandler, progress_: anyerror!Progress) !void { self.process(progress_) catch |err| { std.debug.print("capture handler error: {}\n", .{err}); @@ -3230,5 +3373,5 @@ fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { } fn testClient() !Client { - return try Client.init(testing.allocator, 1, .{}); + return try Client.init(testing.allocator, .{ .max_concurrent = 1 }); } From a5d87ab948cb99a2cbb2d60ac77354752dc27234 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Thu, 5 Jun 2025 23:38:30 +0800 Subject: [PATCH 3/4] Reduce duration of the main request We currently keep the main request open during loadHTMLDoc and processHTMLDoc. It _has_ to be open during loadHTMLDoc, since that streams the body. But it does not have to be open during processHTMLDoc, which can be log and itself could make use of that same connection if it was released. Reorganized the navigate flow to limit the scope of the request. Also, just like we track pending_write and pending_read, we now also track pending_connect and only shutdown when all are not pending. --- src/browser/html/window.zig | 4 +- src/browser/page.zig | 85 ++++++++++++++++++++----------------- src/browser/xhr/xhr.zig | 9 +++- src/http/client.zig | 23 ++++++++-- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/src/browser/html/window.zig b/src/browser/html/window.zig index d65b47fa..02381971 100644 --- a/src/browser/html/window.zig +++ b/src/browser/html/window.zig @@ -164,7 +164,7 @@ pub const Window = struct { } pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 { - return self.createTimeout(cbk, 5, page, .{.animation_frame = true}); + return self.createTimeout(cbk, 5, page, .{ .animation_frame = true }); } pub fn _cancelAnimationFrame(self: *Window, id: u32, page: *Page) !void { @@ -179,7 +179,7 @@ pub const Window = struct { // TODO handle callback arguments. pub fn _setInterval(self: *Window, cbk: Function, delay: ?u32, page: *Page) !u32 { - return self.createTimeout(cbk, delay, page, .{.repeat = true}); + return self.createTimeout(cbk, delay, page, .{ .repeat = true }); } pub fn _clearTimeout(self: *Window, id: u32, page: *Page) !void { diff --git a/src/browser/page.zig b/src/browser/page.zig index 9514ef0d..7044a4d9 100644 --- a/src/browser/page.zig +++ b/src/browser/page.zig @@ -207,58 +207,63 @@ pub const Page = struct { // redirect) self.url = request_url; - // load the data - var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); - defer request.deinit(); - request.body = opts.body; - request.notification = notification; + { + // block exists to limit the lifetime of the request, which holds + // onto a connection + var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); + defer request.deinit(); - notification.dispatch(.page_navigate, &.{ - .opts = opts, - .url = &self.url, - .timestamp = timestamp(), - }); + request.body = opts.body; + request.notification = notification; - var response = try request.sendSync(.{}); + notification.dispatch(.page_navigate, &.{ + .opts = opts, + .url = &self.url, + .timestamp = timestamp(), + }); - // would be different than self.url in the case of a redirect - self.url = try URL.fromURI(arena, request.request_uri); + var response = try request.sendSync(.{}); - const header = response.header; - try session.cookie_jar.populateFromResponse(&self.url.uri, &header); + // would be different than self.url in the case of a redirect + self.url = try URL.fromURI(arena, request.request_uri); - // TODO handle fragment in url. - try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); + const header = response.header; + try session.cookie_jar.populateFromResponse(&self.url.uri, &header); - const content_type = header.get("content-type"); + // TODO handle fragment in url. + try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); - const mime: Mime = blk: { - if (content_type) |ct| { - break :blk try Mime.parse(arena, ct); + const content_type = header.get("content-type"); + + const mime: Mime = blk: { + if (content_type) |ct| { + break :blk try Mime.parse(arena, ct); + } + break :blk Mime.sniff(try response.peek()); + } orelse .unknown; + + log.info(.http, "navigation", .{ + .status = header.status, + .content_type = content_type, + .charset = mime.charset, + .url = request_url, + }); + + if (!mime.isHTML()) { + var arr: std.ArrayListUnmanaged(u8) = .{}; + while (try response.next()) |data| { + try arr.appendSlice(arena, try arena.dupe(u8, data)); + } + // save the body into the page. + self.raw_data = arr.items; + return; } - break :blk Mime.sniff(try response.peek()); - } orelse .unknown; - log.info(.http, "navigation", .{ - .status = header.status, - .content_type = content_type, - .charset = mime.charset, - .url = request_url, - }); - - if (mime.isHTML()) { - self.raw_data = null; try self.loadHTMLDoc(&response, mime.charset orelse "utf-8"); - try self.processHTMLDoc(); - } else { - var arr: std.ArrayListUnmanaged(u8) = .{}; - while (try response.next()) |data| { - try arr.appendSlice(arena, try arena.dupe(u8, data)); - } - // save the body into the page. - self.raw_data = arr.items; } + try self.processHTMLDoc(); + notification.dispatch(.page_navigated, &.{ .url = &self.url, .timestamp = timestamp(), diff --git a/src/browser/xhr/xhr.zig b/src/browser/xhr/xhr.zig index 21b12ce7..7c1087a4 100644 --- a/src/browser/xhr/xhr.zig +++ b/src/browser/xhr/xhr.zig @@ -430,7 +430,14 @@ pub const XMLHttpRequest = struct { self.request_body = try self.arena.dupe(u8, b); } - try page.request_factory.initAsync(page.arena, self.method, &self.url.?.uri, self, onHttpRequestReady, self.loop,); + try page.request_factory.initAsync( + page.arena, + self.method, + &self.url.?.uri, + self, + onHttpRequestReady, + self.loop, + ); } fn onHttpRequestReady(ctx: *anyopaque, request: *http.Request) !void { diff --git a/src/http/client.zig b/src/http/client.zig index e336e49e..83536c4e 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -677,6 +677,7 @@ pub const Request = struct { if (self._connection_from_keepalive) { // we're already connected + async_handler.pending_connect = false; return async_handler.conn.connected(); } @@ -915,6 +916,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { shutdown: bool = false, pending_write: bool = false, pending_receive: bool = false, + pending_connect: bool = true, const Self = @This(); const SendQueue = std.DoublyLinkedList([]const u8); @@ -939,10 +941,15 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn abort(ctx: *anyopaque) void { var self: *Self = @alignCast(@ptrCast(ctx)); self.shutdown = true; + posix.shutdown(self.request._connection.?.socket, .both) catch {}; self.maybeShutdown(); } fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void { + self.pending_connect = false; + if (self.shutdown) { + return self.maybeShutdown(); + } result catch |err| return self.handleError("Connection failed", err); self.conn.connected() catch |err| { self.handleError("connected handler error", err); @@ -1109,7 +1116,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn maybeShutdown(self: *Self) void { std.debug.assert(self.shutdown); - if (self.pending_write or self.pending_receive) { + if (self.pending_write or self.pending_receive or self.pending_connect) { return; } @@ -2538,12 +2545,14 @@ const StatePool = struct { pub fn release(self: *StatePool, state: *State) void { state.reset(); - self.mutex.lock(); var states = self.states; + + self.mutex.lock(); const available = self.available; states[available] = state; self.available = available + 1; self.mutex.unlock(); + self.cond.signal(); } }; @@ -2980,7 +2989,15 @@ test "HttpClient: async connect error" { }; const uri = try Uri.parse("HTTP://127.0.0.1:9920"); - try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, Handler.requestReady, &loop, .{}, ); + try client.initAsync( + testing.arena_allocator, + .GET, + &uri, + &handler, + Handler.requestReady, + &loop, + .{}, + ); try loop.io.run_for_ns(std.time.ns_per_ms); try reset.timedWait(std.time.ns_per_s); From fdd1a778f350dce2e55205b18dac0cb8e01448da Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Fri, 6 Jun 2025 12:53:45 +0800 Subject: [PATCH 4/4] Properly drain event loop when navigating between pages --- src/browser/session.zig | 31 +++++++++++++++---- src/cdp/domains/input.zig | 2 +- src/cdp/domains/target.zig | 2 +- src/runtime/loop.zig | 61 +++++++++++++++++++++----------------- 4 files changed, 61 insertions(+), 35 deletions(-) diff --git a/src/browser/session.zig b/src/browser/session.zig index 44fce9d3..5d550a5e 100644 --- a/src/browser/session.zig +++ b/src/browser/session.zig @@ -72,7 +72,7 @@ pub const Session = struct { pub fn deinit(self: *Session) void { if (self.page != null) { - self.removePage(); + self.removePage() catch {}; } self.cookie_jar.deinit(); self.storage_shed.deinit(); @@ -104,14 +104,35 @@ pub const Session = struct { return page; } - pub fn removePage(self: *Session) void { + pub fn removePage(self: *Session) !void { // Inform CDP the page is going to be removed, allowing other worlds to remove themselves before the main one self.browser.notification.dispatch(.page_remove, .{}); std.debug.assert(self.page != null); - // Reset all existing callbacks. - self.browser.app.loop.reset(); + + // Cleanup is a bit sensitive. We could still have inflight I/O. For + // example, we could have an XHR request which is still in the connect + // phase. It's important that we clean these up, as they're holding onto + // limited resources (like our fixed-sized http state pool). + // + // First thing we do, is endScope() which will execute the destructor + // of any type that registered a destructor (e.g. XMLHttpRequest). + // This will shutdown any pending sockets, which begins our cleaning + // processed self.executor.endScope(); + + // Second thing we do is reset the loop. This increments the loop ctx_id + // so that any "stale" timeouts we process will get ignored. We need to + // do this BEFORE running the loop because, at this point, things like + // window.setTimeout and running microtasks should be ignored + self.browser.app.loop.reset(); + + // Finally, we run the loop. Because of the reset just above, this will + // ignore any timeouts. And, because of the endScope about this, it + // should ensure that the http requests detect the shutdown socket and + // release their resources. + try self.browser.app.loop.run(); + self.page = null; // clear netsurf memory arena. @@ -143,7 +164,7 @@ pub const Session = struct { // the final URL, possibly following redirects) const url = try self.page.?.url.resolve(self.transfer_arena, url_string); - self.removePage(); + try self.removePage(); var page = try self.createPage(); return page.navigate(url, opts); } diff --git a/src/cdp/domains/input.zig b/src/cdp/domains/input.zig index b6897dc2..170098ff 100644 --- a/src/cdp/domains/input.zig +++ b/src/cdp/domains/input.zig @@ -90,7 +90,7 @@ fn clickNavigate(cmd: anytype, uri: std.Uri) !void { .disposition = "currentTab", }, .{ .session_id = bc.session_id.? }); - bc.session.removePage(); + try bc.session.removePage(); _ = try bc.session.createPage(null); try @import("page.zig").navigateToUrl(cmd, url, false); diff --git a/src/cdp/domains/target.zig b/src/cdp/domains/target.zig index f6c197ce..77b46d0a 100644 --- a/src/cdp/domains/target.zig +++ b/src/cdp/domains/target.zig @@ -220,7 +220,7 @@ fn closeTarget(cmd: anytype) !void { bc.session_id = null; } - bc.session.removePage(); + try bc.session.removePage(); if (bc.isolated_world) |*world| { world.deinit(); bc.isolated_world = null; diff --git a/src/runtime/loop.zig b/src/runtime/loop.zig index 0a954e18..a1af200e 100644 --- a/src/runtime/loop.zig +++ b/src/runtime/loop.zig @@ -34,9 +34,11 @@ pub const Loop = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? io: IO, - // Used to track how many callbacks are to be called and wait until all - // event are finished. - events_nb: usize, + // number of pending network events we have + pending_network_count: usize, + + // number of pending timeout events we have + pending_timeout_count: usize, // Used to stop repeating timeouts when loop.run is called. stopping: bool, @@ -66,8 +68,9 @@ pub const Loop = struct { .alloc = alloc, .cancelled = .{}, .io = try IO.init(32, 0), - .events_nb = 0, .stopping = false, + .pending_network_count = 0, + .pending_timeout_count = 0, .timeout_pool = MemoryPool(ContextTimeout).init(alloc), .event_callback_pool = MemoryPool(EventCallbackContext).init(alloc), }; @@ -78,7 +81,7 @@ pub const Loop = struct { // run tail events. We do run the tail events to ensure all the // contexts are correcly free. - while (self.eventsNb() > 0) { + while (self.hasPendinEvents()) { self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| { log.err(.loop, "deinit", .{ .err = err }); break; @@ -93,6 +96,21 @@ pub const Loop = struct { self.cancelled.deinit(self.alloc); } + // We can shutdown once all the pending network IO is complete. + // In debug mode we also wait until al the pending timeouts are complete + // but we only do this so that the `timeoutCallback` can free all allocated + // memory and we won't report a leak. + fn hasPendinEvents(self: *const Self) bool { + if (self.pending_network_count > 0) { + return true; + } + + if (builtin.mode != .Debug) { + return false; + } + return self.pending_timeout_count > 0; + } + // Retrieve all registred I/O events completed by OS kernel, // and execute sequentially their callbacks. // Stops when there is no more I/O events registered on the loop. @@ -103,25 +121,12 @@ pub const Loop = struct { self.stopping = true; defer self.stopping = false; - while (self.eventsNb() > 0) { + while (self.pending_network_count > 0) { try self.io.run_for_ns(10 * std.time.ns_per_ms); // at each iteration we might have new events registred by previous callbacks } } - // Register events atomically - // - add 1 event and return previous value - fn addEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Add, 1, .acq_rel); - } - // - remove 1 event and return previous value - fn removeEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Sub, 1, .acq_rel); - } - // - get the number of current events - fn eventsNb(self: *Self) usize { - return @atomicLoad(usize, &self.events_nb, .seq_cst); - } // JS callbacks APIs // ----------------- @@ -152,7 +157,7 @@ pub const Loop = struct { const loop = ctx.loop; if (ctx.initial) { - loop.removeEvent(); + loop.pending_timeout_count -= 1; } defer { @@ -207,7 +212,7 @@ pub const Loop = struct { .callback_node = callback_node, }; - self.addEvent(); + self.pending_timeout_count += 1; self.scheduleTimeout(nanoseconds, ctx, completion); return @intFromPtr(completion); } @@ -244,17 +249,18 @@ pub const Loop = struct { ) !void { const onConnect = struct { fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onConnect; + const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address); } @@ -271,8 +277,8 @@ pub const Loop = struct { ) !void { const onSend = struct { fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onSend; @@ -281,7 +287,7 @@ pub const Loop = struct { errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf); } @@ -298,8 +304,8 @@ pub const Loop = struct { ) !void { const onRecv = struct { fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onRecv; @@ -307,8 +313,7 @@ pub const Loop = struct { const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - - self.addEvent(); + self.pending_network_count += 1; self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf); } };