diff --git a/src/App.zig b/src/App.zig index 376b79b6..1af5c66a 100644 --- a/src/App.zig +++ b/src/App.zig @@ -27,17 +27,17 @@ const Platform = @import("browser/js/Platform.zig"); const Telemetry = @import("telemetry/telemetry.zig").Telemetry; pub const Http = @import("http/Http.zig"); +pub const Network = Http.Network; pub const ArenaPool = @import("ArenaPool.zig"); pub const Notification = @import("Notification.zig"); const App = @This(); -http: Http, config: *const Config, +network: Network, platform: Platform, snapshot: Snapshot, telemetry: Telemetry, -allocator: Allocator, arena_pool: ArenaPool, app_dir_path: ?[]const u8, notification: *Notification, @@ -48,14 +48,13 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { errdefer allocator.destroy(app); app.config = config; - app.allocator = allocator; + + app.network = try Network.init(allocator, config); + errdefer app.network.deinit(); app.notification = try Notification.init(allocator, null); errdefer app.notification.deinit(); - app.http = try Http.init(allocator, config); - errdefer app.http.deinit(); - app.platform = try Platform.init(); errdefer app.platform.deinit(); @@ -64,7 +63,7 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { app.app_dir_path = getAndMakeAppDir(allocator); - app.telemetry = try Telemetry.init(app, config.mode); + app.telemetry = try Telemetry.init(allocator, app, config.mode); errdefer app.telemetry.deinit(); try app.telemetry.register(app.notification); @@ -75,22 +74,21 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { return app; } -pub fn deinit(self: *App) void { +pub fn deinit(self: *App, allocator: Allocator) void { if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { return; } - const allocator = self.allocator; if (self.app_dir_path) |app_dir_path| { allocator.free(app_dir_path); self.app_dir_path = null; } self.telemetry.deinit(); self.notification.deinit(); - self.http.deinit(); self.snapshot.deinit(); self.platform.deinit(); self.arena_pool.deinit(); + self.network.deinit(); allocator.destroy(self); } diff --git a/src/ArenaPool.zig b/src/ArenaPool.zig index 75adad36..a42e0d42 100644 --- a/src/ArenaPool.zig +++ b/src/ArenaPool.zig @@ -29,6 +29,7 @@ free_list_len: u16 = 0, free_list: ?*Entry = null, free_list_max: u16, entry_pool: std.heap.MemoryPool(Entry), +mutex: std.Thread.Mutex = .{}, const Entry = struct { next: ?*Entry, @@ -41,6 +42,7 @@ pub fn init(allocator: Allocator) ArenaPool { .free_list_max = 512, // TODO make configurable .retain_bytes = 1024 * 16, // TODO make configurable .entry_pool = std.heap.MemoryPool(Entry).init(allocator), + .mutex = .{}, }; } @@ -54,6 +56,9 @@ pub fn deinit(self: *ArenaPool) void { } pub fn acquire(self: *ArenaPool) !Allocator { + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.free_list) |entry| { self.free_list = entry.next; return entry.arena.allocator(); @@ -72,13 +77,18 @@ pub fn release(self: *ArenaPool, allocator: Allocator) void { const arena: *std.heap.ArenaAllocator = @ptrCast(@alignCast(allocator.ptr)); const entry: *Entry = @fieldParentPtr("arena", arena); + // Reset the arena before acquiring the lock to minimize lock hold time + _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); + + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.free_list_len == self.free_list_max) { arena.deinit(); self.entry_pool.destroy(entry); return; } - _ = arena.reset(.{ .retain_with_limit = self.retain_bytes }); entry.next = self.free_list; self.free_list = entry; } diff --git a/src/Config.zig b/src/Config.zig index ed987214..5edecbb6 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -30,6 +30,13 @@ pub const RunMode = enum { version, }; +pub const MAX_HTTP_REQUEST_SIZE = 4096; + +// max message size +// +14 for max websocket payload overhead +// +140 for the max control packet that might be interleaved in a message +pub const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; + mode: Mode, exec_name: []const u8, @@ -116,12 +123,26 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 { }; } +pub fn maxConnections(self: *const Config) u16 { + return switch (self.mode) { + .serve => |opts| opts.max_connections, + else => unreachable, + }; +} + +pub fn maxMemoryPerTab(self: *const Config) usize { + return switch (self.mode) { + .serve => |opts| @intCast(opts.max_memory_per_tab), + else => unreachable, + }; +} + pub fn userAgent(self: *const Config, allocator: Allocator) ![:0]const u8 { const base = "User-Agent: Lightpanda/1.0"; if (self.userAgentSuffix()) |suffix| { return try std.fmt.allocPrintSentinel(allocator, "{s} {s}", .{ base, suffix }, 0); } - return base; + return try allocator.dupeZ(u8, base); } pub const Mode = union(RunMode) { diff --git a/src/LimitedAllocator.zig b/src/LimitedAllocator.zig new file mode 100644 index 00000000..92411cb6 --- /dev/null +++ b/src/LimitedAllocator.zig @@ -0,0 +1,75 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const Allocator = std.mem.Allocator; + +const LimitedAllocator = @This(); + +parent: Allocator, +limit: usize, +allocated: usize = 0, + +pub fn init(parent: Allocator, limit: usize) LimitedAllocator { + return .{ .parent = parent, .limit = limit }; +} + +pub fn allocator(self: *LimitedAllocator) Allocator { + return .{ .ptr = self, .vtable = &vtable }; +} + +const vtable: Allocator.VTable = .{ + .alloc = alloc, + .resize = resize, + .remap = remap, + .free = free, +}; + +fn alloc(ctx: *anyopaque, len: usize, alignment: std.mem.Alignment, ret_addr: usize) ?[*]u8 { + const self: *LimitedAllocator = @ptrCast(@alignCast(ctx)); + if (self.allocated + len > self.limit) return null; + const result = self.parent.rawAlloc(len, alignment, ret_addr); + if (result != null) self.allocated += len; + return result; +} + +fn resize(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) bool { + const self: *LimitedAllocator = @ptrCast(@alignCast(ctx)); + if (new_len > memory.len and self.allocated + new_len - memory.len > self.limit) return false; + if (self.parent.rawResize(memory, alignment, new_len, ret_addr)) { + if (new_len > memory.len) self.allocated += new_len - memory.len else self.allocated -= memory.len - new_len; + return true; + } + return false; +} + +fn remap(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, new_len: usize, ret_addr: usize) ?[*]u8 { + const self: *LimitedAllocator = @ptrCast(@alignCast(ctx)); + if (new_len > memory.len and self.allocated + new_len - memory.len > self.limit) return null; + const result = self.parent.rawRemap(memory, alignment, new_len, ret_addr); + if (result != null) { + if (new_len > memory.len) self.allocated += new_len - memory.len else self.allocated -= memory.len - new_len; + } + return result; +} + +fn free(ctx: *anyopaque, memory: []u8, alignment: std.mem.Alignment, ret_addr: usize) void { + const self: *LimitedAllocator = @ptrCast(@alignCast(ctx)); + self.parent.rawFree(memory, alignment, ret_addr); + self.allocated -= memory.len; +} diff --git a/src/Server.zig b/src/Server.zig index a557d8ed..37a3be3b 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -28,34 +28,31 @@ const ArenaAllocator = std.heap.ArenaAllocator; const log = @import("log.zig"); const App = @import("App.zig"); +const Config = @import("Config.zig"); const CDP = @import("cdp/cdp.zig").CDP; - -const MAX_HTTP_REQUEST_SIZE = 4096; - -// max message size -// +14 for max websocket payload overhead -// +140 for the max control packet that might be interleaved in a message -const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; +const Http = App.Http; +const ThreadPool = @import("ThreadPool.zig"); +const LimitedAllocator = @import("LimitedAllocator.zig"); const Server = @This(); + app: *App, shutdown: bool = false, allocator: Allocator, -client: ?posix.socket_t, listener: ?posix.socket_t, json_version_response: []const u8, +thread_pool: ThreadPool, -pub fn init(app: *App, address: net.Address) !Server { - const allocator = app.allocator; +pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server { const json_version_response = try buildJSONVersionResponse(allocator, address); errdefer allocator.free(json_version_response); return .{ .app = app, - .client = null, .listener = null, .allocator = allocator, .json_version_response = json_version_response, + .thread_pool = ThreadPool.init(allocator, app.config.maxConnections()), }; } @@ -81,12 +78,11 @@ pub fn stop(self: *Server) void { } pub fn deinit(self: *Server) void { + self.thread_pool.deinit(); if (self.listener) |listener| { posix.close(listener); self.listener = null; } - // *if* server.run is running, we should really wait for it to return - // before existing from here. self.allocator.free(self.json_version_response); } @@ -119,97 +115,47 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } }; - self.client = socket; - defer if (self.client) |s| { - posix.close(s); - self.client = null; - }; - - if (log.enabled(.app, .info)) { - var client_address: std.net.Address = undefined; - var socklen: posix.socklen_t = @sizeOf(net.Address); - try std.posix.getsockname(socket, &client_address.any, &socklen); - log.info(.app, "client connected", .{ .ip = client_address }); - } - - self.readLoop(socket, timeout_ms) catch |err| { - log.err(.app, "CDP client loop", .{ .err = err }); + self.thread_pool.spawn(handleConnection, .{ self, socket, timeout_ms }, shutdownConnection, .{socket}) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); }; } } -fn readLoop(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - // This shouldn't be necessary, but the Client is HUGE (> 512KB) because - // it has a large read buffer. I don't know why, but v8 crashes if this - // is on the stack (and I assume it's related to its size). - const client = try self.allocator.create(Client); - defer self.allocator.destroy(client); +fn shutdownConnection(socket: posix.socket_t) void { + posix.shutdown(socket, .recv) catch {}; +} - client.* = try Client.init(socket, self); +fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer posix.close(socket); + + var limited = LimitedAllocator.init(self.allocator, self.app.config.maxMemoryPerTab()); + const client_allocator = limited.allocator(); + + // Client is HUGE (> 512KB) because it has a large read buffer. + // V8 crashes if this is on the stack (likely related to its size). + const client = client_allocator.create(Client) catch |err| { + log.err(.app, "CDP client create", .{ .err = err }); + return; + }; + defer client_allocator.destroy(client); + + client.* = Client.init( + socket, + client_allocator, + self.app, + self.json_version_response, + timeout_ms, + ) catch |err| { + log.err(.app, "CDP client init", .{ .err = err }); + return; + }; defer client.deinit(); - var http = &self.app.http; - http.addCDPClient(.{ - .socket = socket, - .ctx = client, - .blocking_read_start = Client.blockingReadStart, - .blocking_read = Client.blockingRead, - .blocking_read_end = Client.blockingReadStop, - }); - defer http.removeCDPClient(); - - lp.assert(client.mode == .http, "Server.readLoop invalid mode", .{}); - while (true) { - if (http.poll(timeout_ms) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - - if (client.readSocket() == false) { - return; - } - - if (client.mode == .cdp) { - break; // switch to our CDP loop - } - } - - var cdp = &client.mode.cdp; - var last_message = timestamp(.monotonic); - var ms_remaining = timeout_ms; - while (true) { - switch (cdp.pageWait(ms_remaining)) { - .cdp_socket => { - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .no_page => { - if (http.poll(ms_remaining) != .cdp_socket) { - log.info(.app, "CDP timeout", .{}); - return; - } - if (client.readSocket() == false) { - return; - } - last_message = timestamp(.monotonic); - ms_remaining = timeout_ms; - }, - .done => { - const elapsed = timestamp(.monotonic) - last_message; - if (elapsed > ms_remaining) { - log.info(.app, "CDP timeout", .{}); - return; - } - ms_remaining -= @intCast(elapsed); - }, - .navigate => unreachable, // must have been handled by the session - } - } + client.run(); } +// Handle exactly one TCP connection. pub const Client = struct { // The client is initially serving HTTP requests but, under normal circumstances // should eventually be upgraded to a websocket connections @@ -218,11 +164,15 @@ pub const Client = struct { cdp: CDP, }, - server: *Server, + allocator: Allocator, + app: *App, + http: Http, + json_version_response: []const u8, reader: Reader(true), socket: posix.socket_t, socket_flags: usize, send_arena: ArenaAllocator, + timeout_ms: u32, const EMPTY_PONG = [_]u8{ 138, 0 }; @@ -233,22 +183,42 @@ pub const Client = struct { // "private-use" close codes must be from 4000-49999 const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 - fn init(socket: posix.socket_t, server: *Server) !Client { + fn init( + socket: posix.socket_t, + allocator: Allocator, + app: *App, + json_version_response: []const u8, + timeout_ms: u32, + ) !Client { + if (log.enabled(.app, .info)) { + var client_address: std.net.Address = undefined; + var socklen: posix.socklen_t = @sizeOf(net.Address); + try std.posix.getsockname(socket, &client_address.any, &socklen); + log.info(.app, "client connected", .{ .ip = client_address }); + } + const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); // we expect the socket to come to us as nonblocking lp.assert(socket_flags & nonblocking == nonblocking, "Client.init blocking", .{}); - var reader = try Reader(true).init(server.allocator); + var reader = try Reader(true).init(allocator); errdefer reader.deinit(); + var http = try app.network.createHttp(allocator); + errdefer http.deinit(); + return .{ .socket = socket, - .server = server, + .allocator = allocator, + .app = app, + .http = http, + .json_version_response = json_version_response, .reader = reader, .mode = .{ .http = {} }, .socket_flags = socket_flags, - .send_arena = ArenaAllocator.init(server.allocator), + .send_arena = ArenaAllocator.init(allocator), + .timeout_ms = timeout_ms, }; } @@ -259,6 +229,81 @@ pub const Client = struct { } self.reader.deinit(); self.send_arena.deinit(); + self.http.deinit(); + } + + fn run(self: *Client) void { + var http = &self.http; + http.addCDPClient(.{ + .socket = self.socket, + .ctx = self, + .blocking_read_start = Client.blockingReadStart, + .blocking_read = Client.blockingRead, + .blocking_read_end = Client.blockingReadStop, + }); + defer http.removeCDPClient(); + + self.httpLoop(http) catch |err| { + log.err(.app, "CDP client loop", .{ .err = err }); + }; + } + + fn httpLoop(self: *Client, http: anytype) !void { + lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); + while (true) { + if (http.poll(self.timeout_ms) != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + + if (self.readSocket() == false) { + return; + } + + if (self.mode == .cdp) { + break; + } + } + + return self.cdpLoop(http); + } + + fn cdpLoop(self: *Client, http: anytype) !void { + var cdp = &self.mode.cdp; + var last_message = timestamp(.monotonic); + var ms_remaining = self.timeout_ms; + + while (true) { + switch (cdp.pageWait(ms_remaining)) { + .cdp_socket => { + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .no_page => { + if (http.poll(ms_remaining) != .cdp_socket) { + log.info(.app, "CDP timeout", .{}); + return; + } + if (self.readSocket() == false) { + return; + } + last_message = timestamp(.monotonic); + ms_remaining = self.timeout_ms; + }, + .done => { + const elapsed = timestamp(.monotonic) - last_message; + if (elapsed > ms_remaining) { + log.info(.app, "CDP timeout", .{}); + return; + } + ms_remaining -= @intCast(elapsed); + }, + .navigate => unreachable, // must have been handled by the session + } + } } fn blockingReadStart(ctx: *anyopaque) bool { @@ -315,7 +360,7 @@ pub const Client = struct { lp.assert(self.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.reader.pos }); const request = self.reader.buf[0..self.reader.len]; - if (request.len > MAX_HTTP_REQUEST_SIZE) { + if (request.len > Config.MAX_HTTP_REQUEST_SIZE) { self.writeHTTPErrorResponse(413, "Request too large"); return error.RequestTooLarge; } @@ -368,7 +413,7 @@ pub const Client = struct { } if (std.mem.eql(u8, url, "/json/version")) { - try self.send(self.server.json_version_response); + try self.send(self.json_version_response); // Chromedp (a Go driver) does an http request to /json/version // then to / (websocket upgrade) using a different connection. // Since we only allow 1 connection at a time, the 2nd one (the @@ -473,7 +518,7 @@ pub const Client = struct { break :blk res; }; - self.mode = .{ .cdp = try CDP.init(self.server.app, self) }; + self.mode = .{ .cdp = try CDP.init(self.allocator, self.app, &self.http, self) }; return self.send(response); } @@ -708,7 +753,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (message_len > 125) { return error.ControlTooLarge; } - } else if (message_len > MAX_MESSAGE_SIZE) { + } else if (message_len > Config.MAX_MESSAGE_SIZE) { return error.TooLarge; } else if (message_len > self.buf.len) { const len = self.buf.len; @@ -736,7 +781,7 @@ fn Reader(comptime EXPECT_MASK: bool) type { if (is_continuation) { const fragments = &(self.fragments orelse return error.InvalidContinuation); - if (fragments.message.items.len + message_len > MAX_MESSAGE_SIZE) { + if (fragments.message.items.len + message_len > Config.MAX_MESSAGE_SIZE) { return error.TooLarge; } diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig new file mode 100644 index 00000000..55642948 --- /dev/null +++ b/src/ThreadPool.zig @@ -0,0 +1,257 @@ +// Copyright (C) 2023-2025 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const Allocator = std.mem.Allocator; + +const log = @import("log.zig"); + +const ThreadPool = @This(); + +allocator: Allocator, + +active: u16, +shutdown: bool, +max_threads: u16, + +lock: std.Thread.RwLock, +threads: std.DoublyLinkedList, + +const Func = struct { + ptr: *const fn (*anyopaque) void, + args: []u8, + alignment: std.mem.Alignment, + + fn init(allocator: Allocator, func: anytype, args: anytype) !Func { + const Args = @TypeOf(args); + const Wrapper = struct { + fn call(ctx: *anyopaque) void { + const a: *Args = @ptrCast(@alignCast(ctx)); + @call(.auto, func, a.*); + } + }; + + const alignment: std.mem.Alignment = .of(Args); + const size = @sizeOf(Args); + + if (size == 0) { + return .{ + .ptr = Wrapper.call, + .args = &.{}, + .alignment = alignment, + }; + } + + const args_buf = try allocator.alignedAlloc(u8, alignment, size); + + const bytes: []const u8 = @ptrCast((&args)[0..1]); + @memcpy(args_buf, bytes); + + return .{ + .ptr = Wrapper.call, + .args = args_buf, + .alignment = alignment, + }; + } + + fn call(self: Func) void { + self.ptr(@ptrCast(self.args.ptr)); + } + + fn free(self: Func, allocator: Allocator) void { + if (self.args.len > 0) { + allocator.rawFree(self.args, self.alignment, @returnAddress()); + } + } +}; + +const Worker = struct { + run_fn: Func, + shutdown_fn: Func, + pool: *ThreadPool, + thread: std.Thread, + node: std.DoublyLinkedList.Node, + + fn run(self: *Worker) void { + self.run_fn.call(); + self.deinit(); + } + + fn deinit(self: *Worker) void { + const pool = self.pool; + + pool.lock.lock(); + pool.threads.remove(&self.node); + pool.active -= 1; + pool.lock.unlock(); + + self.run_fn.free(pool.allocator); + self.shutdown_fn.free(pool.allocator); + pool.allocator.destroy(self); + } + + fn callShutdown(self: *Worker) void { + self.shutdown_fn.call(); + } +}; + +pub fn init(allocator: Allocator, max_threads: u16) ThreadPool { + return .{ + .allocator = allocator, + .max_threads = max_threads, + .active = 0, + .shutdown = false, + .threads = .{}, + .lock = .{}, + }; +} + +pub fn deinit(self: *ThreadPool) void { + self.join(); +} + +/// Spawn a thread to run run_func(run_args). shutdown_func is called during join(). +pub fn spawn( + self: *ThreadPool, + run_func: anytype, + run_args: std.meta.ArgsTuple(@TypeOf(run_func)), + shutdown_func: anytype, + shutdown_args: std.meta.ArgsTuple(@TypeOf(shutdown_func)), +) !void { + const run_fn = try Func.init(self.allocator, run_func, run_args); + errdefer run_fn.free(self.allocator); + + const shutdown_fn = try Func.init(self.allocator, shutdown_func, shutdown_args); + errdefer shutdown_fn.free(self.allocator); + + const worker = try self.allocator.create(Worker); + errdefer self.allocator.destroy(worker); + + worker.* = .{ + .run_fn = run_fn, + .shutdown_fn = shutdown_fn, + .pool = self, + .thread = undefined, + .node = .{}, + }; + + self.lock.lock(); + defer self.lock.unlock(); + + if (self.shutdown) { + return error.PoolShuttingDown; + } + + if (self.active >= self.max_threads) { + return error.MaxThreadsReached; + } + + self.threads.append(&worker.node); + self.active += 1; + + worker.thread = std.Thread.spawn(.{}, Worker.run, .{worker}) catch |err| { + self.threads.remove(&worker.node); + self.active -= 1; + return err; + }; +} + +/// Number of active threads. +pub fn count(self: *ThreadPool) u16 { + self.lock.lockShared(); + defer self.lock.unlockShared(); + return self.active; +} + +/// Wait for all threads to finish. +pub fn join(self: *ThreadPool) void { + self.lock.lock(); + self.shutdown = true; + + // Call shutdown on all active workers + var node = self.threads.first; + while (node) |n| { + const worker: *Worker = @fieldParentPtr("node", n); + worker.callShutdown(); + node = n.next; + } + self.lock.unlock(); + + while (true) { + self.lock.lockShared(); + const active = self.active; + self.lock.unlockShared(); + + if (active == 0) break; + std.Thread.sleep(10 * std.time.ns_per_ms); + } +} + +pub fn isShuttingDown(self: *ThreadPool) bool { + self.lock.lockShared(); + defer self.lock.unlockShared(); + return self.shutdown; +} + +// Tests +const testing = std.testing; + +fn noop() void {} + +fn increment(counter: *std.atomic.Value(u32)) void { + _ = counter.fetchAdd(1, .acq_rel); +} + +fn block(flag: *std.atomic.Value(bool)) void { + while (!flag.load(.acquire)) { + std.Thread.sleep(1 * std.time.ns_per_ms); + } +} + +fn unblock(flag: *std.atomic.Value(bool)) void { + flag.store(true, .release); +} + +test "ThreadPool: spawn and join" { + var counter = std.atomic.Value(u32).init(0); + var pool = ThreadPool.init(testing.allocator, 4); + defer pool.deinit(); + + try pool.spawn(increment, .{&counter}, noop, .{}); + try pool.spawn(increment, .{&counter}, noop, .{}); + try pool.spawn(increment, .{&counter}, noop, .{}); + + pool.join(); + + try testing.expectEqual(@as(u32, 3), counter.load(.acquire)); + try testing.expectEqual(@as(u16, 0), pool.count()); +} + +test "ThreadPool: max threads limit" { + var flag = std.atomic.Value(bool).init(false); + var pool = ThreadPool.init(testing.allocator, 2); + defer pool.deinit(); + + try pool.spawn(block, .{&flag}, unblock, .{&flag}); + try pool.spawn(block, .{&flag}, unblock, .{&flag}); + + try testing.expectError(error.MaxThreadsReached, pool.spawn(block, .{&flag}, unblock, .{&flag})); + try testing.expectEqual(@as(u16, 2), pool.count()); + + // deinit will call unblock via shutdown callback +} diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index 70b04429..aa00a3e3 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -26,7 +26,8 @@ const log = @import("../log.zig"); const App = @import("../App.zig"); const ArenaPool = App.ArenaPool; -const HttpClient = App.Http.Client; +const Http = App.Http; +const HttpClient = Http.Client; const Notification = App.Notification; const IS_DEBUG = @import("builtin").mode == .Debug; @@ -40,35 +41,33 @@ const Browser = @This(); env: js.Env, app: *App, +http_client: *HttpClient, session: ?Session, allocator: Allocator, arena_pool: *ArenaPool, -http_client: *HttpClient, call_arena: ArenaAllocator, page_arena: ArenaAllocator, session_arena: ArenaAllocator, transfer_arena: ArenaAllocator, notification: *Notification, -pub fn init(app: *App) !Browser { - const allocator = app.allocator; - +pub fn init(allocator: Allocator, app: *App, http_client: *HttpClient) !Browser { var env = try js.Env.init(allocator, &app.platform, &app.snapshot); errdefer env.deinit(); const notification = try Notification.init(allocator, app.notification); - app.http.client.notification = notification; - app.http.client.next_request_id = 0; // Should we track ids in CDP only? + http_client.notification = notification; + http_client.next_request_id = 0; // Should we track ids in CDP only? errdefer notification.deinit(); return .{ .app = app, .env = env, + .http_client = http_client, .session = null, .allocator = allocator, .notification = notification, .arena_pool = &app.arena_pool, - .http_client = app.http.client, .call_arena = ArenaAllocator.init(allocator), .page_arena = ArenaAllocator.init(allocator), .session_arena = ArenaAllocator.init(allocator), diff --git a/src/browser/Session.zig b/src/browser/Session.zig index 340d6b61..e0c91767 100644 --- a/src/browser/Session.zig +++ b/src/browser/Session.zig @@ -66,7 +66,7 @@ pub fn init(self: *Session, browser: *Browser) !void { var executor = try browser.env.newExecutionWorld(); errdefer executor.deinit(); - const allocator = browser.app.allocator; + const allocator = browser.allocator; const session_allocator = browser.session_arena.allocator(); self.* = .{ @@ -86,7 +86,7 @@ pub fn deinit(self: *Session) void { self.removePage(); } self.cookie_jar.deinit(); - self.storage_shed.deinit(self.browser.app.allocator); + self.storage_shed.deinit(self.browser.allocator); self.executor.deinit(); } diff --git a/src/browser/webapi/Navigator.zig b/src/browser/webapi/Navigator.zig index c095ff65..309f8101 100644 --- a/src/browser/webapi/Navigator.zig +++ b/src/browser/webapi/Navigator.zig @@ -27,7 +27,7 @@ _pad: bool = false, pub const init: Navigator = .{}; pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 { - return page._session.browser.app.http.user_agent; + return page._session.browser.app.network.user_agent; } pub fn getAppName(_: *const Navigator) []const u8 { diff --git a/src/cdp/cdp.zig b/src/cdp/cdp.zig index 3f0af4a8..37a2a576 100644 --- a/src/cdp/cdp.zig +++ b/src/cdp/cdp.zig @@ -26,6 +26,7 @@ const log = @import("../log.zig"); const js = @import("../browser/js/js.zig"); const App = @import("../App.zig"); +const Http = App.Http; const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); const Page = @import("../browser/Page.zig"); @@ -78,9 +79,8 @@ pub fn CDPT(comptime TypeProvider: type) type { const Self = @This(); - pub fn init(app: *App, client: TypeProvider.Client) !Self { - const allocator = app.allocator; - const browser = try Browser.init(app); + pub fn init(allocator: Allocator, app: *App, http: *Http, client: TypeProvider.Client) !Self { + const browser = try Browser.init(allocator, app, http.client); errdefer browser.deinit(); return .{ diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 4de1e59f..a174f358 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -85,7 +85,7 @@ const TestContext = struct { self.client = Client.init(self.arena.allocator()); // Don't use the arena here. We want to detect leaks in CDP. // The arena is only for test-specific stuff - self.cdp_ = TestCDP.init(base.test_app, &self.client.?) catch unreachable; + self.cdp_ = TestCDP.init(std.testing.allocator, base.test_app, &base.test_http, &self.client.?) catch unreachable; } return &self.cdp_.?; } diff --git a/src/http/Client.zig b/src/http/Client.zig index c1edd1e7..2b6b8d2e 100644 --- a/src/http/Client.zig +++ b/src/http/Client.zig @@ -168,6 +168,8 @@ pub fn deinit(self: *Client) void { self.abort(); self.handles.deinit(self.allocator); + self.allocator.free(self.user_agent); + _ = c.curl_multi_cleanup(self.multi); self.transfer_pool.deinit(); diff --git a/src/http/Http.zig b/src/http/Http.zig index b619b93f..cf9af628 100644 --- a/src/http/Http.zig +++ b/src/http/Http.zig @@ -41,53 +41,23 @@ const ArenaAllocator = std.heap.ArenaAllocator; // once for all http connections is a win. const Http = @This(); -config: *const Config, +pub const Network = @import("Network.zig"); + +network: *Network, client: *Client, -ca_blob: ?c.curl_blob, -arena: ArenaAllocator, -user_agent: [:0]const u8, -proxy_bearer_header: ?[:0]const u8, -pub fn init(allocator: Allocator, config: *const Config) !Http { - try errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); - errdefer c.curl_global_cleanup(); - - if (comptime ENABLE_DEBUG) { - std.debug.print("curl version: {s}\n\n", .{c.curl_version()}); - } - - var arena = ArenaAllocator.init(allocator); - errdefer arena.deinit(); - - const user_agent = try config.userAgent(arena.allocator()); - - var proxy_bearer_header: ?[:0]const u8 = null; - if (config.proxyBearerToken()) |bt| { - proxy_bearer_header = try std.fmt.allocPrintSentinel(arena.allocator(), "Proxy-Authorization: Bearer {s}", .{bt}, 0); - } - - var ca_blob: ?c.curl_blob = null; - if (config.tlsVerifyHost()) { - ca_blob = try loadCerts(allocator, arena.allocator()); - } - - var client = try Client.init(allocator, ca_blob, config); +pub fn init(allocator: Allocator, network: *Network) !Http { + var client = try Client.init(allocator, network.ca_blob, network.config); errdefer client.deinit(); return .{ - .arena = arena, + .network = network, .client = client, - .ca_blob = ca_blob, - .config = config, - .user_agent = user_agent, - .proxy_bearer_header = proxy_bearer_header, }; } pub fn deinit(self: *Http) void { self.client.deinit(); - c.curl_global_cleanup(); - self.arena.deinit(); } pub fn poll(self: *Http, timeout_ms: u32) Client.PerformStatus { @@ -107,11 +77,11 @@ pub fn removeCDPClient(self: *Http) void { } pub fn newConnection(self: *Http) !Connection { - return Connection.init(self.ca_blob, self.config, self.user_agent, self.proxy_bearer_header); + return Connection.init(self.network.ca_blob, self.network.config, self.network.user_agent, self.network.proxy_bearer_header); } pub fn newHeaders(self: *const Http) Headers { - return Headers.init(self.user_agent); + return Headers.init(self.network.user_agent); } pub const Connection = struct { @@ -363,87 +333,6 @@ pub const Method = enum(u8) { PATCH = 6, }; -// TODO: on BSD / Linux, we could just read the PEM file directly. -// This whole rescan + decode is really just needed for MacOS. On Linux -// bundle.rescan does find the .pem file(s) which could be in a few different -// places, so it's still useful, just not efficient. -fn loadCerts(allocator: Allocator, arena: Allocator) !c.curl_blob { - var bundle: std.crypto.Certificate.Bundle = .{}; - try bundle.rescan(allocator); - defer bundle.deinit(allocator); - - const bytes = bundle.bytes.items; - if (bytes.len == 0) { - log.warn(.app, "No system certificates", .{}); - return .{ - .len = 0, - .flags = 0, - .data = bytes.ptr, - }; - } - - const encoder = std.base64.standard.Encoder; - var arr: std.ArrayListUnmanaged(u8) = .empty; - - const encoded_size = encoder.calcSize(bytes.len); - const buffer_size = encoded_size + - (bundle.map.count() * 75) + // start / end per certificate + extra, just in case - (encoded_size / 64) // newline per 64 characters - ; - try arr.ensureTotalCapacity(arena, buffer_size); - var writer = arr.writer(arena); - - var it = bundle.map.valueIterator(); - while (it.next()) |index| { - const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*); - - try writer.writeAll("-----BEGIN CERTIFICATE-----\n"); - var line_writer = LineWriter{ .inner = writer }; - try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]); - try writer.writeAll("\n-----END CERTIFICATE-----\n"); - } - - // Final encoding should not be larger than our initial size estimate - lp.assert(buffer_size > arr.items.len, "Http loadCerts", .{ .estiate = buffer_size, .len = arr.items.len }); - - return .{ - .len = arr.items.len, - .data = arr.items.ptr, - .flags = 0, - }; -} - -// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is -// what Zig has), with lines wrapped at 64 characters and with a basic header -// and footer -const LineWriter = struct { - col: usize = 0, - inner: std.ArrayListUnmanaged(u8).Writer, - - pub fn writeAll(self: *LineWriter, data: []const u8) !void { - var writer = self.inner; - - var col = self.col; - const len = 64 - col; - - var remain = data; - if (remain.len > len) { - col = 0; - try writer.writeAll(data[0..len]); - try writer.writeByte('\n'); - remain = data[len..]; - } - - while (remain.len > 64) { - try writer.writeAll(remain[0..64]); - try writer.writeByte('\n'); - remain = data[len..]; - } - try writer.writeAll(remain); - self.col = col + remain.len; - } -}; - pub fn debugCallback(_: *c.CURL, msg_type: c.curl_infotype, raw: [*c]u8, len: usize, _: *anyopaque) callconv(.c) void { const data = raw[0..len]; switch (msg_type) { diff --git a/src/http/Network.zig b/src/http/Network.zig new file mode 100644 index 00000000..fb3f8285 --- /dev/null +++ b/src/http/Network.zig @@ -0,0 +1,163 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const lp = @import("lightpanda"); +const Allocator = std.mem.Allocator; + +const log = @import("../log.zig"); +const Config = @import("../Config.zig"); +const Http = @import("Http.zig"); + +pub const c = Http.c; + +const Network = @This(); + +allocator: Allocator, +config: *const Config, +ca_blob: ?c.curl_blob, +user_agent: [:0]const u8, +proxy_bearer_header: ?[:0]const u8, + +pub fn init(allocator: Allocator, config: *const Config) !Network { + try Http.errorCheck(c.curl_global_init(c.CURL_GLOBAL_SSL)); + errdefer c.curl_global_cleanup(); + + const user_agent = try config.userAgent(allocator); + errdefer allocator.free(user_agent); + + var proxy_bearer_header: ?[:0]const u8 = null; + if (config.proxyBearerToken()) |bt| { + proxy_bearer_header = try std.fmt.allocPrintSentinel(allocator, "Proxy-Authorization: Bearer {s}", .{bt}, 0); + } + errdefer if (proxy_bearer_header) |h| allocator.free(h); + + var ca_blob: ?c.curl_blob = null; + if (config.tlsVerifyHost()) { + ca_blob = try loadCerts(allocator); + } + + return .{ + .allocator = allocator, + .config = config, + .ca_blob = ca_blob, + .user_agent = user_agent, + .proxy_bearer_header = proxy_bearer_header, + }; +} + +pub fn deinit(self: *Network) void { + if (self.ca_blob) |ca_blob| { + const data: [*]u8 = @ptrCast(ca_blob.data); + self.allocator.free(data[0..ca_blob.len]); + } + if (self.proxy_bearer_header) |h| self.allocator.free(h); + self.allocator.free(self.user_agent); + c.curl_global_cleanup(); +} + +pub fn createHttp(self: *Network, allocator: Allocator) !Http { + return Http.init(allocator, self); +} + +// TODO: on BSD / Linux, we could just read the PEM file directly. +// This whole rescan + decode is really just needed for MacOS. On Linux +// bundle.rescan does find the .pem file(s) which could be in a few different +// places, so it's still useful, just not efficient. +fn loadCerts(allocator: Allocator) !c.curl_blob { + var bundle: std.crypto.Certificate.Bundle = .{}; + try bundle.rescan(allocator); + defer bundle.deinit(allocator); + + const bytes = bundle.bytes.items; + if (bytes.len == 0) { + log.warn(.app, "No system certificates", .{}); + return .{ + .len = 0, + .flags = 0, + .data = bytes.ptr, + }; + } + + const encoder = std.base64.standard.Encoder; + var arr: std.ArrayListUnmanaged(u8) = .empty; + + const encoded_size = encoder.calcSize(bytes.len); + const buffer_size = encoded_size + + (bundle.map.count() * 75) + // start / end per certificate + extra, just in case + (encoded_size / 64) // newline per 64 characters + ; + try arr.ensureTotalCapacity(allocator, buffer_size); + errdefer arr.deinit(allocator); + var writer = arr.writer(allocator); + + var it = bundle.map.valueIterator(); + while (it.next()) |index| { + const cert = try std.crypto.Certificate.der.Element.parse(bytes, index.*); + + try writer.writeAll("-----BEGIN CERTIFICATE-----\n"); + var line_writer = LineWriter{ .inner = writer }; + try encoder.encodeWriter(&line_writer, bytes[index.*..cert.slice.end]); + try writer.writeAll("\n-----END CERTIFICATE-----\n"); + } + + // Final encoding should not be larger than our initial size estimate + lp.assert(buffer_size > arr.items.len, "Network loadCerts", .{ .estimate = buffer_size, .len = arr.items.len }); + + // Allocate exactly the size needed and copy the data + const result = try allocator.dupe(u8, arr.items); + // Free the original oversized allocation + arr.deinit(allocator); + + return .{ + .len = result.len, + .data = result.ptr, + .flags = 0, + }; +} + +// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is +// what Zig has), with lines wrapped at 64 characters and with a basic header +// and footer +const LineWriter = struct { + col: usize = 0, + inner: std.ArrayListUnmanaged(u8).Writer, + + pub fn writeAll(self: *LineWriter, data: []const u8) !void { + var writer = self.inner; + + var col = self.col; + const len = 64 - col; + + var remain = data; + if (remain.len > len) { + col = 0; + try writer.writeAll(data[0..len]); + try writer.writeByte('\n'); + remain = data[len..]; + } + + while (remain.len > 64) { + try writer.writeAll(remain[0..64]); + try writer.writeByte('\n'); + remain = data[len..]; + } + try writer.writeAll(remain); + self.col = col + remain.len; + } +}; diff --git a/src/lightpanda.zig b/src/lightpanda.zig index b3ca22c2..dd135962 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -20,6 +20,7 @@ const std = @import("std"); pub const App = @import("App.zig"); pub const Server = @import("Server.zig"); pub const Config = @import("Config.zig"); +pub const ThreadPool = @import("ThreadPool.zig"); pub const Page = @import("browser/Page.zig"); pub const Browser = @import("browser/Browser.zig"); pub const Session = @import("browser/Session.zig"); @@ -37,8 +38,11 @@ pub const FetchOpts = struct { dump: dump.RootOpts, writer: ?*std.Io.Writer = null, }; -pub fn fetch(app: *App, url: [:0]const u8, opts: FetchOpts) !void { - var browser = try Browser.init(app); +pub fn fetch(allocator: std.mem.Allocator, app: *App, url: [:0]const u8, opts: FetchOpts) !void { + var http = try app.network.createHttp(allocator); + defer http.deinit(); + + var browser = try Browser.init(allocator, app, http.client); defer browser.deinit(); var session = try browser.newSession(); diff --git a/src/main.zig b/src/main.zig index 9b849dc4..ad3adf4b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -80,7 +80,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo // _app is global to handle graceful shutdown. var app = try App.init(allocator, &args); - defer app.deinit(); + defer app.deinit(allocator); app.telemetry.record(.{ .run = {} }); switch (args.mode) { @@ -92,7 +92,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo }; // _server is global to handle graceful shutdown. - var server = try lp.Server.init(app, address); + var server = try lp.Server.init(allocator, app, address); defer server.deinit(); try sighandler.on(lp.Server.stop, .{&server}); @@ -122,7 +122,7 @@ fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !vo fetch_opts.writer = &writer.interface; } - lp.fetch(app, url, fetch_opts) catch |err| { + lp.fetch(allocator, app, url, fetch_opts) catch |err| { log.fatal(.app, "fetch error", .{ .err = err, .url = url }); return err; }; diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index 5309f98b..06cc4d6f 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -42,12 +42,15 @@ pub fn main() !void { .exec_name = "legacy-test", }; var app = try lp.App.init(allocator, &config); - defer app.deinit(); + defer app.deinit(allocator); var test_arena = std.heap.ArenaAllocator.init(allocator); defer test_arena.deinit(); - var browser = try lp.Browser.init(app); + var http = try app.network.createHttp(allocator); + defer http.deinit(); + + var browser = try lp.Browser.init(allocator, app, http.client); defer browser.deinit(); const session = try browser.newSession(); diff --git a/src/main_wpt.zig b/src/main_wpt.zig index ce381479..f5b4055f 100644 --- a/src/main_wpt.zig +++ b/src/main_wpt.zig @@ -68,9 +68,12 @@ pub fn main() !void { .exec_name = "lightpanda-wpt", }; var app = try lp.App.init(allocator, &config); - defer app.deinit(); + defer app.deinit(allocator); - var browser = try lp.Browser.init(app); + var http = try app.network.createHttp(allocator); + defer http.deinit(); + + var browser = try lp.Browser.init(allocator, app, http.client); defer browser.deinit(); // An arena for running each tests. Is reset after every test. diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index d05929fb..c860732a 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -20,18 +20,21 @@ pub const LightPanda = struct { allocator: Allocator, mutex: std.Thread.Mutex, cond: Thread.Condition, + http: Http, connection: Http.Connection, pending: std.DoublyLinkedList, mem_pool: std.heap.MemoryPool(LightPandaEvent), - pub fn init(app: *App) !LightPanda { - const connection = try app.http.newConnection(); + pub fn init(allocator: Allocator, app: *App) !LightPanda { + var http = try app.network.createHttp(allocator); + errdefer http.deinit(); + + const connection = try http.newConnection(); errdefer connection.deinit(); try connection.setURL(URL); try connection.setMethod(.POST); - const allocator = app.allocator; return .{ .cond = .{}, .mutex = .{}, @@ -39,6 +42,7 @@ pub const LightPanda = struct { .thread = null, .running = true, .allocator = allocator, + .http = http, .connection = connection, .mem_pool = std.heap.MemoryPool(LightPandaEvent).init(allocator), }; @@ -54,6 +58,7 @@ pub const LightPanda = struct { } self.mem_pool.deinit(); self.connection.deinit(); + self.http.deinit(); } pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index ff0d241d..885b4452 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -34,13 +34,13 @@ fn TelemetryT(comptime P: type) type { const Self = @This(); - pub fn init(app: *App, run_mode: Config.RunMode) !Self { + pub fn init(allocator: Allocator, app: *App, run_mode: Config.RunMode) !Self { const disabled = isDisabled(); if (builtin.mode != .Debug and builtin.is_test == false) { log.info(.telemetry, "telemetry status", .{ .disabled = disabled }); } - const provider = try P.init(app); + const provider = try P.init(allocator, app); errdefer provider.deinit(); return .{ @@ -142,7 +142,7 @@ pub const Event = union(enum) { }; const NoopProvider = struct { - fn init(_: *App) !NoopProvider { + fn init(_: Allocator, _: *App) !NoopProvider { return .{}; } fn deinit(_: NoopProvider) void {} @@ -158,7 +158,7 @@ test "telemetry: disabled by environment" { defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); const FailingProvider = struct { - fn init(_: *App) !@This() { + fn init(_: Allocator, _: *App) !@This() { return .{}; } fn deinit(_: @This()) void {} @@ -167,7 +167,7 @@ test "telemetry: disabled by environment" { } }; - var telemetry = try TelemetryT(FailingProvider).init(undefined, .serve); + var telemetry = try TelemetryT(FailingProvider).init(std.testing.allocator, undefined, .serve); defer telemetry.deinit(); telemetry.record(.{ .run = {} }); } @@ -191,7 +191,7 @@ test "telemetry: getOrCreateId" { } test "telemetry: sends event to provider" { - var telemetry = try TelemetryT(MockProvider).init(testing.test_app, .serve); + var telemetry = try TelemetryT(MockProvider).init(std.testing.allocator, testing.test_app, .serve); defer telemetry.deinit(); const mock = &telemetry.provider; @@ -211,12 +211,12 @@ const MockProvider = struct { allocator: Allocator, events: std.ArrayListUnmanaged(Event), - fn init(app: *App) !@This() { + fn init(allocator: Allocator, _: *App) !@This() { return .{ .iid = null, .run_mode = null, .events = .{}, - .allocator = app.allocator, + .allocator = allocator, }; } fn deinit(self: *MockProvider) void { diff --git a/src/testing.zig b/src/testing.zig index db808498..9d1a4f64 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -333,6 +333,7 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool { } pub var test_app: *App = undefined; +pub var test_http: App.Http = undefined; pub var test_browser: Browser = undefined; pub var test_session: *Session = undefined; @@ -465,9 +466,12 @@ test "tests:beforeAll" { log.opts.format = .pretty; test_app = try App.init(@import("root").tracking_allocator, &test_config); - errdefer test_app.deinit(); + errdefer test_app.deinit(@import("root").tracking_allocator); - test_browser = try Browser.init(test_app); + test_http = try test_app.network.createHttp(@import("root").tracking_allocator); + errdefer test_http.deinit(); + + test_browser = try Browser.init(@import("root").tracking_allocator, test_app, test_http.client); errdefer test_browser.deinit(); test_session = try test_browser.newSession(); @@ -502,14 +506,16 @@ test "tests:afterAll" { @import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size; test_browser.deinit(); - test_app.deinit(); + test_http.deinit(); + test_app.deinit(@import("root").tracking_allocator); } fn serveCDP(wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9583); - test_cdp_server = try Server.init(test_app, address); + const test_allocator = @import("root").tracking_allocator; + test_cdp_server = try Server.init(test_allocator, test_app, address); - var server = try Server.init(test_app, address); + var server = try Server.init(test_allocator, test_app, address); defer server.deinit(); wg.finish();