diff --git a/src/app.zig b/src/app.zig index 2501d832..803c55f0 100644 --- a/src/app.zig +++ b/src/app.zig @@ -52,7 +52,8 @@ pub const App = struct { .telemetry = undefined, .app_dir_path = app_dir_path, .notification = notification, - .http_client = try HttpClient.init(allocator, 5, .{ + .http_client = try HttpClient.init(allocator, .{ + .max_concurrent = 3, .http_proxy = config.http_proxy, .tls_verify_host = config.tls_verify_host, }), diff --git a/src/browser/html/window.zig b/src/browser/html/window.zig index d65b47fa..02381971 100644 --- a/src/browser/html/window.zig +++ b/src/browser/html/window.zig @@ -164,7 +164,7 @@ pub const Window = struct { } pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 { - return self.createTimeout(cbk, 5, page, .{.animation_frame = true}); + return self.createTimeout(cbk, 5, page, .{ .animation_frame = true }); } pub fn _cancelAnimationFrame(self: *Window, id: u32, page: *Page) !void { @@ -179,7 +179,7 @@ pub const Window = struct { // TODO handle callback arguments. pub fn _setInterval(self: *Window, cbk: Function, delay: ?u32, page: *Page) !u32 { - return self.createTimeout(cbk, delay, page, .{.repeat = true}); + return self.createTimeout(cbk, delay, page, .{ .repeat = true }); } pub fn _clearTimeout(self: *Window, id: u32, page: *Page) !void { diff --git a/src/browser/page.zig b/src/browser/page.zig index 83f1136b..7044a4d9 100644 --- a/src/browser/page.zig +++ b/src/browser/page.zig @@ -113,7 +113,9 @@ pub const Page = struct { .cookie_jar = &session.cookie_jar, .microtask_node = .{ .func = microtaskCallback }, .window_clicked_event_node = .{ .func = windowClicked }, - .request_factory = browser.http_client.requestFactory(browser.notification), + .request_factory = browser.http_client.requestFactory(.{ + .notification = browser.notification, + }), .scope = undefined, .module_map = .empty, }; @@ -205,58 +207,63 @@ pub const Page = struct { // redirect) self.url = request_url; - // load the data - var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); - defer request.deinit(); - request.body = opts.body; - request.notification = notification; + { + // block exists to limit the lifetime of the request, which holds + // onto a connection + var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); + defer request.deinit(); - notification.dispatch(.page_navigate, &.{ - .opts = opts, - .url = &self.url, - .timestamp = timestamp(), - }); + request.body = opts.body; + request.notification = notification; - var response = try request.sendSync(.{}); + notification.dispatch(.page_navigate, &.{ + .opts = opts, + .url = &self.url, + .timestamp = timestamp(), + }); - // would be different than self.url in the case of a redirect - self.url = try URL.fromURI(arena, request.request_uri); + var response = try request.sendSync(.{}); - const header = response.header; - try session.cookie_jar.populateFromResponse(&self.url.uri, &header); + // would be different than self.url in the case of a redirect + self.url = try URL.fromURI(arena, request.request_uri); - // TODO handle fragment in url. - try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); + const header = response.header; + try session.cookie_jar.populateFromResponse(&self.url.uri, &header); - const content_type = header.get("content-type"); + // TODO handle fragment in url. + try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); - const mime: Mime = blk: { - if (content_type) |ct| { - break :blk try Mime.parse(arena, ct); + const content_type = header.get("content-type"); + + const mime: Mime = blk: { + if (content_type) |ct| { + break :blk try Mime.parse(arena, ct); + } + break :blk Mime.sniff(try response.peek()); + } orelse .unknown; + + log.info(.http, "navigation", .{ + .status = header.status, + .content_type = content_type, + .charset = mime.charset, + .url = request_url, + }); + + if (!mime.isHTML()) { + var arr: std.ArrayListUnmanaged(u8) = .{}; + while (try response.next()) |data| { + try arr.appendSlice(arena, try arena.dupe(u8, data)); + } + // save the body into the page. + self.raw_data = arr.items; + return; } - break :blk Mime.sniff(try response.peek()); - } orelse .unknown; - log.info(.http, "navigation", .{ - .status = header.status, - .content_type = content_type, - .charset = mime.charset, - .url = request_url, - }); - - if (mime.isHTML()) { - self.raw_data = null; try self.loadHTMLDoc(&response, mime.charset orelse "utf-8"); - try self.processHTMLDoc(); - } else { - var arr: std.ArrayListUnmanaged(u8) = .{}; - while (try response.next()) |data| { - try arr.appendSlice(arena, try arena.dupe(u8, data)); - } - // save the body into the page. - self.raw_data = arr.items; } + try self.processHTMLDoc(); + notification.dispatch(.page_navigated, &.{ .url = &self.url, .timestamp = timestamp(), diff --git a/src/browser/session.zig b/src/browser/session.zig index 44fce9d3..5d550a5e 100644 --- a/src/browser/session.zig +++ b/src/browser/session.zig @@ -72,7 +72,7 @@ pub const Session = struct { pub fn deinit(self: *Session) void { if (self.page != null) { - self.removePage(); + self.removePage() catch {}; } self.cookie_jar.deinit(); self.storage_shed.deinit(); @@ -104,14 +104,35 @@ pub const Session = struct { return page; } - pub fn removePage(self: *Session) void { + pub fn removePage(self: *Session) !void { // Inform CDP the page is going to be removed, allowing other worlds to remove themselves before the main one self.browser.notification.dispatch(.page_remove, .{}); std.debug.assert(self.page != null); - // Reset all existing callbacks. - self.browser.app.loop.reset(); + + // Cleanup is a bit sensitive. We could still have inflight I/O. For + // example, we could have an XHR request which is still in the connect + // phase. It's important that we clean these up, as they're holding onto + // limited resources (like our fixed-sized http state pool). + // + // First thing we do, is endScope() which will execute the destructor + // of any type that registered a destructor (e.g. XMLHttpRequest). + // This will shutdown any pending sockets, which begins our cleaning + // processed self.executor.endScope(); + + // Second thing we do is reset the loop. This increments the loop ctx_id + // so that any "stale" timeouts we process will get ignored. We need to + // do this BEFORE running the loop because, at this point, things like + // window.setTimeout and running microtasks should be ignored + self.browser.app.loop.reset(); + + // Finally, we run the loop. Because of the reset just above, this will + // ignore any timeouts. And, because of the endScope about this, it + // should ensure that the http requests detect the shutdown socket and + // release their resources. + try self.browser.app.loop.run(); + self.page = null; // clear netsurf memory arena. @@ -143,7 +164,7 @@ pub const Session = struct { // the final URL, possibly following redirects) const url = try self.page.?.url.resolve(self.transfer_arena, url_string); - self.removePage(); + try self.removePage(); var page = try self.createPage(); return page.navigate(url, opts); } diff --git a/src/browser/xhr/xhr.zig b/src/browser/xhr/xhr.zig index 3f1dfc2a..52428a0f 100644 --- a/src/browser/xhr/xhr.zig +++ b/src/browser/xhr/xhr.zig @@ -30,6 +30,7 @@ const Mime = @import("../mime.zig").Mime; const parser = @import("../netsurf.zig"); const http = @import("../../http/client.zig"); const Page = @import("../page.zig").Page; +const Loop = @import("../../runtime/loop.zig").Loop; const CookieJar = @import("../storage/storage.zig").CookieJar; // XHR interfaces @@ -78,6 +79,7 @@ const XMLHttpRequestBodyInit = union(enum) { pub const XMLHttpRequest = struct { proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{}, + loop: *Loop, arena: Allocator, request: ?*http.Request = null, @@ -91,6 +93,7 @@ pub const XMLHttpRequest = struct { sync: bool = true, err: ?anyerror = null, last_dispatch: i64 = 0, + request_body: ?[]const u8 = null, cookie_jar: *CookieJar, // the URI of the page where this request is originating from @@ -241,12 +244,13 @@ pub const XMLHttpRequest = struct { pub fn constructor(page: *Page) !XMLHttpRequest { const arena = page.arena; return .{ + .url = null, .arena = arena, + .loop = page.loop, .headers = Headers.init(arena), .response_headers = Headers.init(arena), .method = undefined, .state = .unsent, - .url = null, .origin_url = &page.url, .cookie_jar = page.cookie_jar, }; @@ -422,10 +426,23 @@ pub const XMLHttpRequest = struct { log.debug(.http, "request", .{ .method = self.method, .url = self.url, .source = "xhr" }); self.send_flag = true; + if (body) |b| { + self.request_body = try self.arena.dupe(u8, b); + } - self.request = try page.request_factory.create(self.method, &self.url.?.uri); - var request = self.request.?; - errdefer request.deinit(); + try page.request_factory.initAsync( + page.arena, + self.method, + &self.url.?.uri, + self, + onHttpRequestReady, + self.loop, + ); + } + + fn onHttpRequestReady(ctx: *anyopaque, request: *http.Request) !void { + // on error, our caller will cleanup request + const self: *XMLHttpRequest = @alignCast(@ptrCast(ctx)); for (self.headers.list.items) |hdr| { try request.addHeader(hdr.name, hdr.value, .{}); @@ -433,7 +450,7 @@ pub const XMLHttpRequest = struct { { var arr: std.ArrayListUnmanaged(u8) = .{}; - try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(page.arena), .{ + try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(self.arena), .{ .navigation = false, .origin_uri = &self.origin_url.uri, }); @@ -447,14 +464,15 @@ pub const XMLHttpRequest = struct { // if the request method is GET or HEAD. // https://xhr.spec.whatwg.org/#the-send()-method // var used_body: ?XMLHttpRequestBodyInit = null; - if (body) |b| { + if (self.request_body) |b| { if (self.method != .GET and self.method != .HEAD) { - request.body = try page.arena.dupe(u8, b); + request.body = b; try request.addHeader("Content-Type", "text/plain; charset=UTF-8", .{}); } } - try request.sendAsync(page.loop, self, .{}); + try request.sendAsync(self.loop, self, .{}); + self.request = request; } pub fn onHttpResponse(self: *XMLHttpRequest, progress_: anyerror!http.Progress) !void { diff --git a/src/cdp/domains/input.zig b/src/cdp/domains/input.zig index b6897dc2..170098ff 100644 --- a/src/cdp/domains/input.zig +++ b/src/cdp/domains/input.zig @@ -90,7 +90,7 @@ fn clickNavigate(cmd: anytype, uri: std.Uri) !void { .disposition = "currentTab", }, .{ .session_id = bc.session_id.? }); - bc.session.removePage(); + try bc.session.removePage(); _ = try bc.session.createPage(null); try @import("page.zig").navigateToUrl(cmd, url, false); diff --git a/src/cdp/domains/target.zig b/src/cdp/domains/target.zig index f6c197ce..77b46d0a 100644 --- a/src/cdp/domains/target.zig +++ b/src/cdp/domains/target.zig @@ -220,7 +220,7 @@ fn closeTarget(cmd: anytype) !void { bc.session_id = null; } - bc.session.removePage(); + try bc.session.removePage(); if (bc.isolated_world) |*world| { world.deinit(); bc.isolated_world = null; diff --git a/src/http/client.zig b/src/http/client.zig index 968402b7..83536c4e 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -54,16 +54,17 @@ pub const Client = struct { request_pool: std.heap.MemoryPool(Request), const Opts = struct { - tls_verify_host: bool = true, + max_concurrent: usize = 3, http_proxy: ?std.Uri = null, + tls_verify_host: bool = true, max_idle_connection: usize = 10, }; - pub fn init(allocator: Allocator, max_concurrent: usize, opts: Opts) !Client { + pub fn init(allocator: Allocator, opts: Opts) !Client { var root_ca: tls.config.CertBundle = if (builtin.is_test) .{} else try tls.config.CertBundle.fromSystem(allocator); errdefer root_ca.deinit(allocator); - const state_pool = try StatePool.init(allocator, max_concurrent); + const state_pool = try StatePool.init(allocator, opts.max_concurrent); errdefer state_pool.deinit(allocator); const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection); @@ -92,13 +93,62 @@ pub const Client = struct { } pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !*Request { - const state = self.state_pool.acquire(); + const state = self.state_pool.acquireWait(); + errdefer self.state_pool.release(state); - errdefer { - state.reset(); - self.state_pool.release(state); + const req = try self.request_pool.create(); + errdefer self.request_pool.destroy(req); + + req.* = try Request.init(self, state, method, uri); + return req; + } + + pub fn initAsync( + self: *Client, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + opts: RequestOpts, + ) !void { + if (self.state_pool.acquireOrNull()) |state| { + // if we have state ready, we can skip the loop and immediately + // kick this request off. + return self.asyncRequestReady(method, uri, ctx, callback, state, opts); } + // This cannot be a client-owned MemoryPool. The page can end before + // this is ever completed (and the check callback will never be called). + // As long as the loop doesn't guarantee that callbacks will be called, + // this _has_ to be the page arena. + const queue = try arena.create(AsyncQueue); + queue.* = .{ + .ctx = ctx, + .uri = uri, + .opts = opts, + .client = self, + .method = method, + .callback = callback, + .node = .{ .func = AsyncQueue.check }, + }; + _ = try loop.timeout(10 * std.time.ns_per_ms, &queue.node); + } + + // Either called directly from initAsync (if we have a state ready) + // Or from when the AsyncQueue(T) is ready. + fn asyncRequestReady( + self: *Client, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + state: *State, + opts: RequestOpts, + ) !void { + errdefer self.state_pool.release(state); + // We need the request on the heap, because it can have a longer lifetime // than the code making the request. That sounds odd, but consider the // case of an XHR request: it can still be inflight (e.g. waiting for @@ -110,26 +160,78 @@ pub const Client = struct { errdefer self.request_pool.destroy(req); req.* = try Request.init(self, state, method, uri); - return req; + if (opts.notification) |notification| { + req.notification = notification; + } + + errdefer req.deinit(); + try callback(ctx, req); } - pub fn requestFactory(self: *Client, notification: ?*Notification) RequestFactory { + pub fn requestFactory(self: *Client, opts: RequestOpts) RequestFactory { return .{ + .opts = opts, .client = self, - .notification = notification, }; } }; +const RequestOpts = struct { + notification: ?*Notification = null, +}; + // A factory for creating requests with a given set of options. pub const RequestFactory = struct { client: *Client, - notification: ?*Notification, + opts: RequestOpts, - pub fn create(self: RequestFactory, method: Request.Method, uri: *const Uri) !*Request { - var req = try self.client.request(method, uri); - req.notification = self.notification; - return req; + pub fn initAsync( + self: RequestFactory, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + ) !void { + return self.client.initAsync(arena, method, uri, ctx, callback, loop, self.opts); + } +}; + +const AsyncQueue = struct { + ctx: *anyopaque, + method: Request.Method, + uri: *const Uri, + client: *Client, + opts: RequestOpts, + node: Loop.CallbackNode, + callback: Callback, + + const Callback = *const fn (*anyopaque, *Request) anyerror!void; + + fn check(node: *Loop.CallbackNode, repeat_delay: *?u63) void { + const self: *AsyncQueue = @fieldParentPtr("node", node); + self._check(repeat_delay) catch |err| { + log.err(.http_client, "async queue check", .{ .err = err }); + }; + } + + fn _check(self: *AsyncQueue, repeat_delay: *?u63) !void { + const client = self.client; + const state = client.state_pool.acquireOrNull() orelse { + // re-run this function in 10 milliseconds + repeat_delay.* = 10 * std.time.ns_per_ms; + return; + }; + + try client.asyncRequestReady( + self.method, + self.uri, + self.ctx, + self.callback, + state, + self.opts, + ); } }; @@ -321,7 +423,6 @@ pub const Request = struct { pub fn deinit(self: *Request) void { self.releaseConnection(); - _ = self._state.reset(); self._client.state_pool.release(self._state); self._client.request_pool.destroy(self); } @@ -576,6 +677,7 @@ pub const Request = struct { if (self._connection_from_keepalive) { // we're already connected + async_handler.pending_connect = false; return async_handler.conn.connected(); } @@ -814,6 +916,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { shutdown: bool = false, pending_write: bool = false, pending_receive: bool = false, + pending_connect: bool = true, const Self = @This(); const SendQueue = std.DoublyLinkedList([]const u8); @@ -838,10 +941,15 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn abort(ctx: *anyopaque) void { var self: *Self = @alignCast(@ptrCast(ctx)); self.shutdown = true; + posix.shutdown(self.request._connection.?.socket, .both) catch {}; self.maybeShutdown(); } fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void { + self.pending_connect = false; + if (self.shutdown) { + return self.maybeShutdown(); + } result catch |err| return self.handleError("Connection failed", err); self.conn.connected() catch |err| { self.handleError("connected handler error", err); @@ -1008,7 +1116,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn maybeShutdown(self: *Self) void { std.debug.assert(self.shutdown); - if (self.pending_write or self.pending_receive) { + if (self.pending_write or self.pending_receive or self.pending_connect) { return; } @@ -1137,6 +1245,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { self.handleError("decompression error", err); return .done; }; + self.handler.onHttpResponse(.{ .data = chunk, .first = first, @@ -2346,7 +2455,7 @@ const State = struct { } fn reset(self: *State) void { - _ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 }); + _ = self.arena.reset(.{ .retain_with_limit = 64 * 1024 }); } fn deinit(self: *State) void { @@ -2399,10 +2508,11 @@ const StatePool = struct { allocator.free(self.states); } - pub fn acquire(self: *StatePool) *State { + pub fn acquireWait(self: *StatePool) *State { + const states = self.states; + self.mutex.lock(); while (true) { - const states = self.states; const available = self.available; if (available == 0) { self.cond.wait(&self.mutex); @@ -2416,13 +2526,33 @@ const StatePool = struct { } } - pub fn release(self: *StatePool, state: *State) void { + pub fn acquireOrNull(self: *StatePool) ?*State { + const states = self.states; + self.mutex.lock(); + defer self.mutex.unlock(); + + const available = self.available; + if (available == 0) { + return null; + } + + const index = available - 1; + const state = states[index]; + self.available = index; + return state; + } + + pub fn release(self: *StatePool, state: *State) void { + state.reset(); var states = self.states; + + self.mutex.lock(); const available = self.available; states[available] = state; self.available = available + 1; self.mutex.unlock(); + self.cond.signal(); } }; @@ -2823,11 +2953,19 @@ test "HttpClient: sync GET redirect" { } test "HttpClient: async connect error" { + defer testing.reset(); var loop = try Loop.init(testing.allocator); defer loop.deinit(); const Handler = struct { + loop: *Loop, reset: *Thread.ResetEvent, + + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *@This() = @alignCast(@ptrCast(ctx)); + try req.sendAsync(self.loop, self, .{}); + } + fn onHttpResponse(self: *@This(), res: anyerror!Progress) !void { _ = res catch |err| { if (err == error.ConnectionRefused) { @@ -2845,14 +2983,29 @@ test "HttpClient: async connect error" { var client = try testClient(); defer client.deinit(); + var handler = Handler{ + .loop = &loop, + .reset = &reset, + }; + const uri = try Uri.parse("HTTP://127.0.0.1:9920"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&loop, Handler{ .reset = &reset }, .{}); + try client.initAsync( + testing.arena_allocator, + .GET, + &uri, + &handler, + Handler.requestReady, + &loop, + .{}, + ); + try loop.io.run_for_ns(std.time.ns_per_ms); try reset.timedWait(std.time.ns_per_s); } test "HttpClient: async no body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2860,8 +3013,7 @@ test "HttpClient: async no body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2871,6 +3023,8 @@ test "HttpClient: async no body" { } test "HttpClient: async with body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2878,8 +3032,7 @@ test "HttpClient: async with body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/echo"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2894,6 +3047,8 @@ test "HttpClient: async with body" { } test "HttpClient: async with gzip body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2901,8 +3056,7 @@ test "HttpClient: async with gzip body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/gzip"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2916,6 +3070,8 @@ test "HttpClient: async with gzip body" { } test "HttpClient: async redirect" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2923,8 +3079,7 @@ test "HttpClient: async redirect" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/redirect"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); // Called twice on purpose. The initial GET resutls in the # of pending // events to reach 0. This causes our `run_for_ns` to return. But we then @@ -2945,6 +3100,7 @@ test "HttpClient: async redirect" { } test "HttpClient: async tls no body" { + defer testing.reset(); var client = try testClient(); defer client.deinit(); for (0..5) |_| { @@ -2952,8 +3108,7 @@ test "HttpClient: async tls no body" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2969,6 +3124,7 @@ test "HttpClient: async tls no body" { } test "HttpClient: async tls with body x" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -2977,8 +3133,7 @@ test "HttpClient: async tls with body x" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/body"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2993,6 +3148,7 @@ test "HttpClient: async tls with body x" { } test "HttpClient: async redirect from TLS to Plaintext" { + defer testing.reset(); for (0..1) |_| { var client = try testClient(); defer client.deinit(); @@ -3001,8 +3157,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { defer handler.deinit(); const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -3018,6 +3173,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { } test "HttpClient: async redirect plaintext to TLS" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -3026,8 +3182,7 @@ test "HttpClient: async redirect plaintext to TLS" { defer handler.deinit(); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -3149,6 +3304,11 @@ const CaptureHandler = struct { self.loop.deinit(); } + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *CaptureHandler = @alignCast(@ptrCast(ctx)); + try req.sendAsync(&self.loop, self, .{ .tls_verify_host = false }); + } + fn onHttpResponse(self: *CaptureHandler, progress_: anyerror!Progress) !void { self.process(progress_) catch |err| { std.debug.print("capture handler error: {}\n", .{err}); @@ -3230,5 +3390,5 @@ fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { } fn testClient() !Client { - return try Client.init(testing.allocator, 1, .{}); + return try Client.init(testing.allocator, .{ .max_concurrent = 1 }); } diff --git a/src/runtime/loop.zig b/src/runtime/loop.zig index 0a954e18..a1af200e 100644 --- a/src/runtime/loop.zig +++ b/src/runtime/loop.zig @@ -34,9 +34,11 @@ pub const Loop = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? io: IO, - // Used to track how many callbacks are to be called and wait until all - // event are finished. - events_nb: usize, + // number of pending network events we have + pending_network_count: usize, + + // number of pending timeout events we have + pending_timeout_count: usize, // Used to stop repeating timeouts when loop.run is called. stopping: bool, @@ -66,8 +68,9 @@ pub const Loop = struct { .alloc = alloc, .cancelled = .{}, .io = try IO.init(32, 0), - .events_nb = 0, .stopping = false, + .pending_network_count = 0, + .pending_timeout_count = 0, .timeout_pool = MemoryPool(ContextTimeout).init(alloc), .event_callback_pool = MemoryPool(EventCallbackContext).init(alloc), }; @@ -78,7 +81,7 @@ pub const Loop = struct { // run tail events. We do run the tail events to ensure all the // contexts are correcly free. - while (self.eventsNb() > 0) { + while (self.hasPendinEvents()) { self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| { log.err(.loop, "deinit", .{ .err = err }); break; @@ -93,6 +96,21 @@ pub const Loop = struct { self.cancelled.deinit(self.alloc); } + // We can shutdown once all the pending network IO is complete. + // In debug mode we also wait until al the pending timeouts are complete + // but we only do this so that the `timeoutCallback` can free all allocated + // memory and we won't report a leak. + fn hasPendinEvents(self: *const Self) bool { + if (self.pending_network_count > 0) { + return true; + } + + if (builtin.mode != .Debug) { + return false; + } + return self.pending_timeout_count > 0; + } + // Retrieve all registred I/O events completed by OS kernel, // and execute sequentially their callbacks. // Stops when there is no more I/O events registered on the loop. @@ -103,25 +121,12 @@ pub const Loop = struct { self.stopping = true; defer self.stopping = false; - while (self.eventsNb() > 0) { + while (self.pending_network_count > 0) { try self.io.run_for_ns(10 * std.time.ns_per_ms); // at each iteration we might have new events registred by previous callbacks } } - // Register events atomically - // - add 1 event and return previous value - fn addEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Add, 1, .acq_rel); - } - // - remove 1 event and return previous value - fn removeEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Sub, 1, .acq_rel); - } - // - get the number of current events - fn eventsNb(self: *Self) usize { - return @atomicLoad(usize, &self.events_nb, .seq_cst); - } // JS callbacks APIs // ----------------- @@ -152,7 +157,7 @@ pub const Loop = struct { const loop = ctx.loop; if (ctx.initial) { - loop.removeEvent(); + loop.pending_timeout_count -= 1; } defer { @@ -207,7 +212,7 @@ pub const Loop = struct { .callback_node = callback_node, }; - self.addEvent(); + self.pending_timeout_count += 1; self.scheduleTimeout(nanoseconds, ctx, completion); return @intFromPtr(completion); } @@ -244,17 +249,18 @@ pub const Loop = struct { ) !void { const onConnect = struct { fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onConnect; + const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address); } @@ -271,8 +277,8 @@ pub const Loop = struct { ) !void { const onSend = struct { fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onSend; @@ -281,7 +287,7 @@ pub const Loop = struct { errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf); } @@ -298,8 +304,8 @@ pub const Loop = struct { ) !void { const onRecv = struct { fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onRecv; @@ -307,8 +313,7 @@ pub const Loop = struct { const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - - self.addEvent(); + self.pending_network_count += 1; self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf); } };