Merge pull request #767 from lightpanda-io/unblock_async_http_request

Unblock async http request
This commit is contained in:
Karl Seguin
2025-06-06 13:22:29 +08:00
committed by GitHub
9 changed files with 339 additions and 127 deletions

View File

@@ -52,7 +52,8 @@ pub const App = struct {
.telemetry = undefined,
.app_dir_path = app_dir_path,
.notification = notification,
.http_client = try HttpClient.init(allocator, 5, .{
.http_client = try HttpClient.init(allocator, .{
.max_concurrent = 3,
.http_proxy = config.http_proxy,
.tls_verify_host = config.tls_verify_host,
}),

View File

@@ -164,7 +164,7 @@ pub const Window = struct {
}
pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 {
return self.createTimeout(cbk, 5, page, .{.animation_frame = true});
return self.createTimeout(cbk, 5, page, .{ .animation_frame = true });
}
pub fn _cancelAnimationFrame(self: *Window, id: u32, page: *Page) !void {
@@ -179,7 +179,7 @@ pub const Window = struct {
// TODO handle callback arguments.
pub fn _setInterval(self: *Window, cbk: Function, delay: ?u32, page: *Page) !u32 {
return self.createTimeout(cbk, delay, page, .{.repeat = true});
return self.createTimeout(cbk, delay, page, .{ .repeat = true });
}
pub fn _clearTimeout(self: *Window, id: u32, page: *Page) !void {

View File

@@ -113,7 +113,9 @@ pub const Page = struct {
.cookie_jar = &session.cookie_jar,
.microtask_node = .{ .func = microtaskCallback },
.window_clicked_event_node = .{ .func = windowClicked },
.request_factory = browser.http_client.requestFactory(browser.notification),
.request_factory = browser.http_client.requestFactory(.{
.notification = browser.notification,
}),
.scope = undefined,
.module_map = .empty,
};
@@ -205,58 +207,63 @@ pub const Page = struct {
// redirect)
self.url = request_url;
// load the data
var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true });
defer request.deinit();
request.body = opts.body;
request.notification = notification;
{
// block exists to limit the lifetime of the request, which holds
// onto a connection
var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true });
defer request.deinit();
notification.dispatch(.page_navigate, &.{
.opts = opts,
.url = &self.url,
.timestamp = timestamp(),
});
request.body = opts.body;
request.notification = notification;
var response = try request.sendSync(.{});
notification.dispatch(.page_navigate, &.{
.opts = opts,
.url = &self.url,
.timestamp = timestamp(),
});
// would be different than self.url in the case of a redirect
self.url = try URL.fromURI(arena, request.request_uri);
var response = try request.sendSync(.{});
const header = response.header;
try session.cookie_jar.populateFromResponse(&self.url.uri, &header);
// would be different than self.url in the case of a redirect
self.url = try URL.fromURI(arena, request.request_uri);
// TODO handle fragment in url.
try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) });
const header = response.header;
try session.cookie_jar.populateFromResponse(&self.url.uri, &header);
const content_type = header.get("content-type");
// TODO handle fragment in url.
try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) });
const mime: Mime = blk: {
if (content_type) |ct| {
break :blk try Mime.parse(arena, ct);
const content_type = header.get("content-type");
const mime: Mime = blk: {
if (content_type) |ct| {
break :blk try Mime.parse(arena, ct);
}
break :blk Mime.sniff(try response.peek());
} orelse .unknown;
log.info(.http, "navigation", .{
.status = header.status,
.content_type = content_type,
.charset = mime.charset,
.url = request_url,
});
if (!mime.isHTML()) {
var arr: std.ArrayListUnmanaged(u8) = .{};
while (try response.next()) |data| {
try arr.appendSlice(arena, try arena.dupe(u8, data));
}
// save the body into the page.
self.raw_data = arr.items;
return;
}
break :blk Mime.sniff(try response.peek());
} orelse .unknown;
log.info(.http, "navigation", .{
.status = header.status,
.content_type = content_type,
.charset = mime.charset,
.url = request_url,
});
if (mime.isHTML()) {
self.raw_data = null;
try self.loadHTMLDoc(&response, mime.charset orelse "utf-8");
try self.processHTMLDoc();
} else {
var arr: std.ArrayListUnmanaged(u8) = .{};
while (try response.next()) |data| {
try arr.appendSlice(arena, try arena.dupe(u8, data));
}
// save the body into the page.
self.raw_data = arr.items;
}
try self.processHTMLDoc();
notification.dispatch(.page_navigated, &.{
.url = &self.url,
.timestamp = timestamp(),

View File

@@ -72,7 +72,7 @@ pub const Session = struct {
pub fn deinit(self: *Session) void {
if (self.page != null) {
self.removePage();
self.removePage() catch {};
}
self.cookie_jar.deinit();
self.storage_shed.deinit();
@@ -104,14 +104,35 @@ pub const Session = struct {
return page;
}
pub fn removePage(self: *Session) void {
pub fn removePage(self: *Session) !void {
// Inform CDP the page is going to be removed, allowing other worlds to remove themselves before the main one
self.browser.notification.dispatch(.page_remove, .{});
std.debug.assert(self.page != null);
// Reset all existing callbacks.
self.browser.app.loop.reset();
// Cleanup is a bit sensitive. We could still have inflight I/O. For
// example, we could have an XHR request which is still in the connect
// phase. It's important that we clean these up, as they're holding onto
// limited resources (like our fixed-sized http state pool).
//
// First thing we do, is endScope() which will execute the destructor
// of any type that registered a destructor (e.g. XMLHttpRequest).
// This will shutdown any pending sockets, which begins our cleaning
// processed
self.executor.endScope();
// Second thing we do is reset the loop. This increments the loop ctx_id
// so that any "stale" timeouts we process will get ignored. We need to
// do this BEFORE running the loop because, at this point, things like
// window.setTimeout and running microtasks should be ignored
self.browser.app.loop.reset();
// Finally, we run the loop. Because of the reset just above, this will
// ignore any timeouts. And, because of the endScope about this, it
// should ensure that the http requests detect the shutdown socket and
// release their resources.
try self.browser.app.loop.run();
self.page = null;
// clear netsurf memory arena.
@@ -143,7 +164,7 @@ pub const Session = struct {
// the final URL, possibly following redirects)
const url = try self.page.?.url.resolve(self.transfer_arena, url_string);
self.removePage();
try self.removePage();
var page = try self.createPage();
return page.navigate(url, opts);
}

View File

@@ -30,6 +30,7 @@ const Mime = @import("../mime.zig").Mime;
const parser = @import("../netsurf.zig");
const http = @import("../../http/client.zig");
const Page = @import("../page.zig").Page;
const Loop = @import("../../runtime/loop.zig").Loop;
const CookieJar = @import("../storage/storage.zig").CookieJar;
// XHR interfaces
@@ -78,6 +79,7 @@ const XMLHttpRequestBodyInit = union(enum) {
pub const XMLHttpRequest = struct {
proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{},
loop: *Loop,
arena: Allocator,
request: ?*http.Request = null,
@@ -91,6 +93,7 @@ pub const XMLHttpRequest = struct {
sync: bool = true,
err: ?anyerror = null,
last_dispatch: i64 = 0,
request_body: ?[]const u8 = null,
cookie_jar: *CookieJar,
// the URI of the page where this request is originating from
@@ -241,12 +244,13 @@ pub const XMLHttpRequest = struct {
pub fn constructor(page: *Page) !XMLHttpRequest {
const arena = page.arena;
return .{
.url = null,
.arena = arena,
.loop = page.loop,
.headers = Headers.init(arena),
.response_headers = Headers.init(arena),
.method = undefined,
.state = .unsent,
.url = null,
.origin_url = &page.url,
.cookie_jar = page.cookie_jar,
};
@@ -422,10 +426,23 @@ pub const XMLHttpRequest = struct {
log.debug(.http, "request", .{ .method = self.method, .url = self.url, .source = "xhr" });
self.send_flag = true;
if (body) |b| {
self.request_body = try self.arena.dupe(u8, b);
}
self.request = try page.request_factory.create(self.method, &self.url.?.uri);
var request = self.request.?;
errdefer request.deinit();
try page.request_factory.initAsync(
page.arena,
self.method,
&self.url.?.uri,
self,
onHttpRequestReady,
self.loop,
);
}
fn onHttpRequestReady(ctx: *anyopaque, request: *http.Request) !void {
// on error, our caller will cleanup request
const self: *XMLHttpRequest = @alignCast(@ptrCast(ctx));
for (self.headers.list.items) |hdr| {
try request.addHeader(hdr.name, hdr.value, .{});
@@ -433,7 +450,7 @@ pub const XMLHttpRequest = struct {
{
var arr: std.ArrayListUnmanaged(u8) = .{};
try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(page.arena), .{
try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(self.arena), .{
.navigation = false,
.origin_uri = &self.origin_url.uri,
});
@@ -447,14 +464,15 @@ pub const XMLHttpRequest = struct {
// if the request method is GET or HEAD.
// https://xhr.spec.whatwg.org/#the-send()-method
// var used_body: ?XMLHttpRequestBodyInit = null;
if (body) |b| {
if (self.request_body) |b| {
if (self.method != .GET and self.method != .HEAD) {
request.body = try page.arena.dupe(u8, b);
request.body = b;
try request.addHeader("Content-Type", "text/plain; charset=UTF-8", .{});
}
}
try request.sendAsync(page.loop, self, .{});
try request.sendAsync(self.loop, self, .{});
self.request = request;
}
pub fn onHttpResponse(self: *XMLHttpRequest, progress_: anyerror!http.Progress) !void {

View File

@@ -90,7 +90,7 @@ fn clickNavigate(cmd: anytype, uri: std.Uri) !void {
.disposition = "currentTab",
}, .{ .session_id = bc.session_id.? });
bc.session.removePage();
try bc.session.removePage();
_ = try bc.session.createPage(null);
try @import("page.zig").navigateToUrl(cmd, url, false);

View File

@@ -220,7 +220,7 @@ fn closeTarget(cmd: anytype) !void {
bc.session_id = null;
}
bc.session.removePage();
try bc.session.removePage();
if (bc.isolated_world) |*world| {
world.deinit();
bc.isolated_world = null;

View File

@@ -54,16 +54,17 @@ pub const Client = struct {
request_pool: std.heap.MemoryPool(Request),
const Opts = struct {
tls_verify_host: bool = true,
max_concurrent: usize = 3,
http_proxy: ?std.Uri = null,
tls_verify_host: bool = true,
max_idle_connection: usize = 10,
};
pub fn init(allocator: Allocator, max_concurrent: usize, opts: Opts) !Client {
pub fn init(allocator: Allocator, opts: Opts) !Client {
var root_ca: tls.config.CertBundle = if (builtin.is_test) .{} else try tls.config.CertBundle.fromSystem(allocator);
errdefer root_ca.deinit(allocator);
const state_pool = try StatePool.init(allocator, max_concurrent);
const state_pool = try StatePool.init(allocator, opts.max_concurrent);
errdefer state_pool.deinit(allocator);
const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection);
@@ -92,13 +93,62 @@ pub const Client = struct {
}
pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !*Request {
const state = self.state_pool.acquire();
const state = self.state_pool.acquireWait();
errdefer self.state_pool.release(state);
errdefer {
state.reset();
self.state_pool.release(state);
const req = try self.request_pool.create();
errdefer self.request_pool.destroy(req);
req.* = try Request.init(self, state, method, uri);
return req;
}
pub fn initAsync(
self: *Client,
arena: Allocator,
method: Request.Method,
uri: *const Uri,
ctx: *anyopaque,
callback: AsyncQueue.Callback,
loop: *Loop,
opts: RequestOpts,
) !void {
if (self.state_pool.acquireOrNull()) |state| {
// if we have state ready, we can skip the loop and immediately
// kick this request off.
return self.asyncRequestReady(method, uri, ctx, callback, state, opts);
}
// This cannot be a client-owned MemoryPool. The page can end before
// this is ever completed (and the check callback will never be called).
// As long as the loop doesn't guarantee that callbacks will be called,
// this _has_ to be the page arena.
const queue = try arena.create(AsyncQueue);
queue.* = .{
.ctx = ctx,
.uri = uri,
.opts = opts,
.client = self,
.method = method,
.callback = callback,
.node = .{ .func = AsyncQueue.check },
};
_ = try loop.timeout(10 * std.time.ns_per_ms, &queue.node);
}
// Either called directly from initAsync (if we have a state ready)
// Or from when the AsyncQueue(T) is ready.
fn asyncRequestReady(
self: *Client,
method: Request.Method,
uri: *const Uri,
ctx: *anyopaque,
callback: AsyncQueue.Callback,
state: *State,
opts: RequestOpts,
) !void {
errdefer self.state_pool.release(state);
// We need the request on the heap, because it can have a longer lifetime
// than the code making the request. That sounds odd, but consider the
// case of an XHR request: it can still be inflight (e.g. waiting for
@@ -110,26 +160,78 @@ pub const Client = struct {
errdefer self.request_pool.destroy(req);
req.* = try Request.init(self, state, method, uri);
return req;
if (opts.notification) |notification| {
req.notification = notification;
}
errdefer req.deinit();
try callback(ctx, req);
}
pub fn requestFactory(self: *Client, notification: ?*Notification) RequestFactory {
pub fn requestFactory(self: *Client, opts: RequestOpts) RequestFactory {
return .{
.opts = opts,
.client = self,
.notification = notification,
};
}
};
const RequestOpts = struct {
notification: ?*Notification = null,
};
// A factory for creating requests with a given set of options.
pub const RequestFactory = struct {
client: *Client,
notification: ?*Notification,
opts: RequestOpts,
pub fn create(self: RequestFactory, method: Request.Method, uri: *const Uri) !*Request {
var req = try self.client.request(method, uri);
req.notification = self.notification;
return req;
pub fn initAsync(
self: RequestFactory,
arena: Allocator,
method: Request.Method,
uri: *const Uri,
ctx: *anyopaque,
callback: AsyncQueue.Callback,
loop: *Loop,
) !void {
return self.client.initAsync(arena, method, uri, ctx, callback, loop, self.opts);
}
};
const AsyncQueue = struct {
ctx: *anyopaque,
method: Request.Method,
uri: *const Uri,
client: *Client,
opts: RequestOpts,
node: Loop.CallbackNode,
callback: Callback,
const Callback = *const fn (*anyopaque, *Request) anyerror!void;
fn check(node: *Loop.CallbackNode, repeat_delay: *?u63) void {
const self: *AsyncQueue = @fieldParentPtr("node", node);
self._check(repeat_delay) catch |err| {
log.err(.http_client, "async queue check", .{ .err = err });
};
}
fn _check(self: *AsyncQueue, repeat_delay: *?u63) !void {
const client = self.client;
const state = client.state_pool.acquireOrNull() orelse {
// re-run this function in 10 milliseconds
repeat_delay.* = 10 * std.time.ns_per_ms;
return;
};
try client.asyncRequestReady(
self.method,
self.uri,
self.ctx,
self.callback,
state,
self.opts,
);
}
};
@@ -321,7 +423,6 @@ pub const Request = struct {
pub fn deinit(self: *Request) void {
self.releaseConnection();
_ = self._state.reset();
self._client.state_pool.release(self._state);
self._client.request_pool.destroy(self);
}
@@ -576,6 +677,7 @@ pub const Request = struct {
if (self._connection_from_keepalive) {
// we're already connected
async_handler.pending_connect = false;
return async_handler.conn.connected();
}
@@ -814,6 +916,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
shutdown: bool = false,
pending_write: bool = false,
pending_receive: bool = false,
pending_connect: bool = true,
const Self = @This();
const SendQueue = std.DoublyLinkedList([]const u8);
@@ -838,10 +941,15 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
fn abort(ctx: *anyopaque) void {
var self: *Self = @alignCast(@ptrCast(ctx));
self.shutdown = true;
posix.shutdown(self.request._connection.?.socket, .both) catch {};
self.maybeShutdown();
}
fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void {
self.pending_connect = false;
if (self.shutdown) {
return self.maybeShutdown();
}
result catch |err| return self.handleError("Connection failed", err);
self.conn.connected() catch |err| {
self.handleError("connected handler error", err);
@@ -1008,7 +1116,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
fn maybeShutdown(self: *Self) void {
std.debug.assert(self.shutdown);
if (self.pending_write or self.pending_receive) {
if (self.pending_write or self.pending_receive or self.pending_connect) {
return;
}
@@ -1137,6 +1245,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type {
self.handleError("decompression error", err);
return .done;
};
self.handler.onHttpResponse(.{
.data = chunk,
.first = first,
@@ -2346,7 +2455,7 @@ const State = struct {
}
fn reset(self: *State) void {
_ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 });
_ = self.arena.reset(.{ .retain_with_limit = 64 * 1024 });
}
fn deinit(self: *State) void {
@@ -2399,10 +2508,11 @@ const StatePool = struct {
allocator.free(self.states);
}
pub fn acquire(self: *StatePool) *State {
pub fn acquireWait(self: *StatePool) *State {
const states = self.states;
self.mutex.lock();
while (true) {
const states = self.states;
const available = self.available;
if (available == 0) {
self.cond.wait(&self.mutex);
@@ -2416,13 +2526,33 @@ const StatePool = struct {
}
}
pub fn release(self: *StatePool, state: *State) void {
pub fn acquireOrNull(self: *StatePool) ?*State {
const states = self.states;
self.mutex.lock();
defer self.mutex.unlock();
const available = self.available;
if (available == 0) {
return null;
}
const index = available - 1;
const state = states[index];
self.available = index;
return state;
}
pub fn release(self: *StatePool, state: *State) void {
state.reset();
var states = self.states;
self.mutex.lock();
const available = self.available;
states[available] = state;
self.available = available + 1;
self.mutex.unlock();
self.cond.signal();
}
};
@@ -2823,11 +2953,19 @@ test "HttpClient: sync GET redirect" {
}
test "HttpClient: async connect error" {
defer testing.reset();
var loop = try Loop.init(testing.allocator);
defer loop.deinit();
const Handler = struct {
loop: *Loop,
reset: *Thread.ResetEvent,
fn requestReady(ctx: *anyopaque, req: *Request) !void {
const self: *@This() = @alignCast(@ptrCast(ctx));
try req.sendAsync(self.loop, self, .{});
}
fn onHttpResponse(self: *@This(), res: anyerror!Progress) !void {
_ = res catch |err| {
if (err == error.ConnectionRefused) {
@@ -2845,14 +2983,29 @@ test "HttpClient: async connect error" {
var client = try testClient();
defer client.deinit();
var handler = Handler{
.loop = &loop,
.reset = &reset,
};
const uri = try Uri.parse("HTTP://127.0.0.1:9920");
var req = try client.request(.GET, &uri);
try req.sendAsync(&loop, Handler{ .reset = &reset }, .{});
try client.initAsync(
testing.arena_allocator,
.GET,
&uri,
&handler,
Handler.requestReady,
&loop,
.{},
);
try loop.io.run_for_ns(std.time.ns_per_ms);
try reset.timedWait(std.time.ns_per_s);
}
test "HttpClient: async no body" {
defer testing.reset();
var client = try testClient();
defer client.deinit();
@@ -2860,8 +3013,7 @@ test "HttpClient: async no body" {
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/simple");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -2871,6 +3023,8 @@ test "HttpClient: async no body" {
}
test "HttpClient: async with body" {
defer testing.reset();
var client = try testClient();
defer client.deinit();
@@ -2878,8 +3032,7 @@ test "HttpClient: async with body" {
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/echo");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -2894,6 +3047,8 @@ test "HttpClient: async with body" {
}
test "HttpClient: async with gzip body" {
defer testing.reset();
var client = try testClient();
defer client.deinit();
@@ -2901,8 +3056,7 @@ test "HttpClient: async with gzip body" {
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/gzip");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -2916,6 +3070,8 @@ test "HttpClient: async with gzip body" {
}
test "HttpClient: async redirect" {
defer testing.reset();
var client = try testClient();
defer client.deinit();
@@ -2923,8 +3079,7 @@ test "HttpClient: async redirect" {
defer handler.deinit();
const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/redirect");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{});
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
// Called twice on purpose. The initial GET resutls in the # of pending
// events to reach 0. This causes our `run_for_ns` to return. But we then
@@ -2945,6 +3100,7 @@ test "HttpClient: async redirect" {
}
test "HttpClient: async tls no body" {
defer testing.reset();
var client = try testClient();
defer client.deinit();
for (0..5) |_| {
@@ -2952,8 +3108,7 @@ test "HttpClient: async tls no body" {
defer handler.deinit();
const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/simple");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -2969,6 +3124,7 @@ test "HttpClient: async tls no body" {
}
test "HttpClient: async tls with body x" {
defer testing.reset();
for (0..5) |_| {
var client = try testClient();
defer client.deinit();
@@ -2977,8 +3133,7 @@ test "HttpClient: async tls with body x" {
defer handler.deinit();
const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/body");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -2993,6 +3148,7 @@ test "HttpClient: async tls with body x" {
}
test "HttpClient: async redirect from TLS to Plaintext" {
defer testing.reset();
for (0..1) |_| {
var client = try testClient();
defer client.deinit();
@@ -3001,8 +3157,7 @@ test "HttpClient: async redirect from TLS to Plaintext" {
defer handler.deinit();
const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -3018,6 +3173,7 @@ test "HttpClient: async redirect from TLS to Plaintext" {
}
test "HttpClient: async redirect plaintext to TLS" {
defer testing.reset();
for (0..5) |_| {
var client = try testClient();
defer client.deinit();
@@ -3026,8 +3182,7 @@ test "HttpClient: async redirect plaintext to TLS" {
defer handler.deinit();
const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure");
var req = try client.request(.GET, &uri);
try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false });
try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{});
try handler.waitUntilDone();
const res = handler.response;
@@ -3149,6 +3304,11 @@ const CaptureHandler = struct {
self.loop.deinit();
}
fn requestReady(ctx: *anyopaque, req: *Request) !void {
const self: *CaptureHandler = @alignCast(@ptrCast(ctx));
try req.sendAsync(&self.loop, self, .{ .tls_verify_host = false });
}
fn onHttpResponse(self: *CaptureHandler, progress_: anyerror!Progress) !void {
self.process(progress_) catch |err| {
std.debug.print("capture handler error: {}\n", .{err});
@@ -3230,5 +3390,5 @@ fn testReader(state: *State, res: *TestResponse, data: []const u8) !void {
}
fn testClient() !Client {
return try Client.init(testing.allocator, 1, .{});
return try Client.init(testing.allocator, .{ .max_concurrent = 1 });
}

View File

@@ -34,9 +34,11 @@ pub const Loop = struct {
alloc: std.mem.Allocator, // TODO: unmanaged version ?
io: IO,
// Used to track how many callbacks are to be called and wait until all
// event are finished.
events_nb: usize,
// number of pending network events we have
pending_network_count: usize,
// number of pending timeout events we have
pending_timeout_count: usize,
// Used to stop repeating timeouts when loop.run is called.
stopping: bool,
@@ -66,8 +68,9 @@ pub const Loop = struct {
.alloc = alloc,
.cancelled = .{},
.io = try IO.init(32, 0),
.events_nb = 0,
.stopping = false,
.pending_network_count = 0,
.pending_timeout_count = 0,
.timeout_pool = MemoryPool(ContextTimeout).init(alloc),
.event_callback_pool = MemoryPool(EventCallbackContext).init(alloc),
};
@@ -78,7 +81,7 @@ pub const Loop = struct {
// run tail events. We do run the tail events to ensure all the
// contexts are correcly free.
while (self.eventsNb() > 0) {
while (self.hasPendinEvents()) {
self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| {
log.err(.loop, "deinit", .{ .err = err });
break;
@@ -93,6 +96,21 @@ pub const Loop = struct {
self.cancelled.deinit(self.alloc);
}
// We can shutdown once all the pending network IO is complete.
// In debug mode we also wait until al the pending timeouts are complete
// but we only do this so that the `timeoutCallback` can free all allocated
// memory and we won't report a leak.
fn hasPendinEvents(self: *const Self) bool {
if (self.pending_network_count > 0) {
return true;
}
if (builtin.mode != .Debug) {
return false;
}
return self.pending_timeout_count > 0;
}
// Retrieve all registred I/O events completed by OS kernel,
// and execute sequentially their callbacks.
// Stops when there is no more I/O events registered on the loop.
@@ -103,25 +121,12 @@ pub const Loop = struct {
self.stopping = true;
defer self.stopping = false;
while (self.eventsNb() > 0) {
while (self.pending_network_count > 0) {
try self.io.run_for_ns(10 * std.time.ns_per_ms);
// at each iteration we might have new events registred by previous callbacks
}
}
// Register events atomically
// - add 1 event and return previous value
fn addEvent(self: *Self) void {
_ = @atomicRmw(usize, &self.events_nb, .Add, 1, .acq_rel);
}
// - remove 1 event and return previous value
fn removeEvent(self: *Self) void {
_ = @atomicRmw(usize, &self.events_nb, .Sub, 1, .acq_rel);
}
// - get the number of current events
fn eventsNb(self: *Self) usize {
return @atomicLoad(usize, &self.events_nb, .seq_cst);
}
// JS callbacks APIs
// -----------------
@@ -152,7 +157,7 @@ pub const Loop = struct {
const loop = ctx.loop;
if (ctx.initial) {
loop.removeEvent();
loop.pending_timeout_count -= 1;
}
defer {
@@ -207,7 +212,7 @@ pub const Loop = struct {
.callback_node = callback_node,
};
self.addEvent();
self.pending_timeout_count += 1;
self.scheduleTimeout(nanoseconds, ctx, completion);
return @intFromPtr(completion);
}
@@ -244,17 +249,18 @@ pub const Loop = struct {
) !void {
const onConnect = struct {
fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
callback.loop.removeEvent();
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onConnect;
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.addEvent();
self.pending_network_count += 1;
self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address);
}
@@ -271,8 +277,8 @@ pub const Loop = struct {
) !void {
const onSend = struct {
fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
callback.loop.removeEvent();
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onSend;
@@ -281,7 +287,7 @@ pub const Loop = struct {
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.addEvent();
self.pending_network_count += 1;
self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf);
}
@@ -298,8 +304,8 @@ pub const Loop = struct {
) !void {
const onRecv = struct {
fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void {
callback.loop.pending_network_count -= 1;
defer callback.loop.event_callback_pool.destroy(callback);
callback.loop.removeEvent();
cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res);
}
}.onRecv;
@@ -307,8 +313,7 @@ pub const Loop = struct {
const callback = try self.event_callback_pool.create();
errdefer self.event_callback_pool.destroy(callback);
callback.* = .{ .loop = self, .ctx = ctx };
self.addEvent();
self.pending_network_count += 1;
self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf);
}
};