diff --git a/build.zig b/build.zig index 7d97728a..00f5a322 100644 --- a/build.zig +++ b/build.zig @@ -35,7 +35,7 @@ pub fn build(b: *Build) !void { opts.addOption([]const u8, "git_commit", git_commit orelse "dev"); opts.addOption(?[]const u8, "snapshot_path", snapshot_path); - const enable_tsan = b.option(bool, "tsan", "Enable Thread Sanitizer"); + const enable_tsan = b.option(bool, "tsan", "Enable Thread Sanitizer") orelse false; const enable_csan = b.option(std.zig.SanitizeC, "csan", "Enable C Sanitizers"); const lightpanda_module = blk: { @@ -50,7 +50,7 @@ pub fn build(b: *Build) !void { }); mod.addImport("lightpanda", mod); // allow circular "lightpanda" import - try addDependencies(b, mod, opts, prebuilt_v8_path); + try addDependencies(b, mod, opts, prebuilt_v8_path, enable_tsan); break :blk mod; }; @@ -170,7 +170,7 @@ pub fn build(b: *Build) !void { } } -fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, prebuilt_v8_path: ?[]const u8) !void { +fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, prebuilt_v8_path: ?[]const u8, is_tsan: bool) !void { mod.addImport("build_config", opts.createModule()); const target = mod.resolved_target.?; @@ -179,6 +179,8 @@ fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, pre .optimize = mod.optimize.?, .prebuilt_v8_path = prebuilt_v8_path, .cache_root = b.pathFromRoot(".lp-cache"), + .is_tsan = is_tsan, + .v8_enable_sandbox = is_tsan, // v8 contains a bug and cannot be compiled with tsan without a sandbox. }; mod.addIncludePath(b.path("vendor/lightpanda")); diff --git a/build.zig.zon b/build.zig.zon index f32868b8..949ea9ce 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -5,11 +5,11 @@ .fingerprint = 0xda130f3af836cea0, // Changing this has security and trust implications. .minimum_zig_version = "0.15.2", .dependencies = .{ - .v8 = .{ - .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/v0.2.6.tar.gz", - .hash = "v8-0.0.0-xddH60NRBAAWmpZq9nWdfFAEqVJ9zqJnvr1Nl9m2AbcY", - }, - //.v8 = .{ .path = "../zig-v8-fork" }, + // .v8 = .{ + // .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/v0.2.6.tar.gz", + // .hash = "v8-0.0.0-xddH60NRBAAWmpZq9nWdfFAEqVJ9zqJnvr1Nl9m2AbcY", + // }, + .v8 = .{ .path = "../zig-v8-fork" }, .@"boringssl-zig" = .{ .url = "git+https://github.com/Syndica/boringssl-zig.git#c53df00d06b02b755ad88bbf4d1202ed9687b096", .hash = "boringssl-0.1.0-VtJeWehMAAA4RNnwRnzEvKcS9rjsR1QVRw1uJrwXxmVK", diff --git a/src/ArenaPool.zig b/src/ArenaPool.zig index a42e0d42..eb92c6b8 100644 --- a/src/ArenaPool.zig +++ b/src/ArenaPool.zig @@ -29,7 +29,7 @@ free_list_len: u16 = 0, free_list: ?*Entry = null, free_list_max: u16, entry_pool: std.heap.MemoryPool(Entry), -mutex: std.Thread.Mutex = .{}, +mutex: std.Thread.Mutex, const Entry = struct { next: ?*Entry, diff --git a/src/Server.zig b/src/Server.zig index f4a6b810..6155186e 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -32,16 +32,20 @@ const Config = @import("Config.zig"); const CDP = @import("cdp/cdp.zig").CDP; const Http = @import("http/Http.zig"); const HttpClient = @import("http/Client.zig"); -const ThreadPool = @import("ThreadPool.zig"); const Server = @This(); app: *App, -shutdown: bool = false, +shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, listener: ?posix.socket_t, json_version_response: []const u8, -thread_pool: ThreadPool, + +// Thread management +active_threads: std.atomic.Value(u32) = .init(0), +clients: std.ArrayListUnmanaged(*Client) = .{}, +clients_mu: std.Thread.Mutex = .{}, +clients_pool: std.heap.MemoryPool(Client), pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server { const json_version_response = try buildJSONVersionResponse(allocator, address); @@ -52,16 +56,23 @@ pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server { .listener = null, .allocator = allocator, .json_version_response = json_version_response, - .thread_pool = ThreadPool.init(allocator, app.config.maxConnections()), + .clients_pool = std.heap.MemoryPool(Client).init(allocator), }; } /// Interrupts the server so that main can complete normally and call all defer handlers. pub fn stop(self: *Server) void { - if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { + if (self.shutdown.swap(true, .release)) { return; } + // Shutdown all active clients + self.clients_mu.lock(); + for (self.clients.items) |client| { + client.stop(); + } + self.clients_mu.unlock(); + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). @@ -78,16 +89,18 @@ pub fn stop(self: *Server) void { } pub fn deinit(self: *Server) void { - self.thread_pool.deinit(); + self.joinThreads(); if (self.listener) |listener| { posix.close(listener); self.listener = null; } + self.clients.deinit(self.allocator); + self.clients_pool.deinit(); self.allocator.free(self.json_version_response); } pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); self.listener = listener; @@ -100,13 +113,17 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { try posix.listen(listener, 1); log.info(.app, "server running", .{ .address = address }); - while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { + while (!self.shutdown.load(.acquire)) { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { switch (err) { error.SocketNotListening, error.ConnectionAborted => { log.info(.app, "server stopped", .{}); break; }, + error.WouldBlock => { + std.Thread.sleep(10 * std.time.ns_per_ms); + continue; + }, else => { log.err(.app, "CDP accept", .{ .err = err }); std.Thread.sleep(std.time.ns_per_s); @@ -115,27 +132,23 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } }; - self.thread_pool.spawn(handleConnection, .{ self, socket, timeout_ms }, shutdownConnection, .{socket}) catch |err| { + self.spawnWorker(socket, timeout_ms) catch |err| { log.err(.app, "CDP spawn", .{ .err = err }); posix.close(socket); }; } } -fn shutdownConnection(socket: posix.socket_t) void { - posix.shutdown(socket, .recv) catch {}; -} - fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { defer posix.close(socket); // Client is HUGE (> 512KB) because it has a large read buffer. // V8 crashes if this is on the stack (likely related to its size). - const client = self.allocator.create(Client) catch |err| { + const client = self.getClient() catch |err| { log.err(.app, "CDP client create", .{ .err = err }); return; }; - defer self.allocator.destroy(client); + defer self.releaseClient(client); client.* = Client.init( socket, @@ -149,7 +162,66 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void }; defer client.deinit(); - client.run(); + self.registerClient(client); + defer self.unregisterClient(client); + + client.start(); +} + +fn getClient(self: *Server) !*Client { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + return self.clients_pool.create(); +} + +fn releaseClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + self.clients_pool.destroy(client); +} + +fn registerClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + self.clients.append(self.allocator, client) catch {}; +} + +fn unregisterClient(self: *Server, client: *Client) void { + self.clients_mu.lock(); + defer self.clients_mu.unlock(); + for (self.clients.items, 0..) |c, i| { + if (c == client) { + _ = self.clients.swapRemove(i); + break; + } + } +} + +fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { + if (self.shutdown.load(.acquire)) { + return error.ShuttingDown; + } + + if (self.active_threads.load(.monotonic) >= self.app.config.maxConnections()) { + return error.MaxThreadsReached; + } + + _ = self.active_threads.fetchAdd(1, .monotonic); + errdefer _ = self.active_threads.fetchSub(1, .monotonic); + + const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms }); + thread.detach(); +} + +fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer _ = self.active_threads.fetchSub(1, .monotonic); + handleConnection(self, socket, timeout_ms); +} + +fn joinThreads(self: *Server) void { + while (self.active_threads.load(.monotonic) > 0) { + std.Thread.sleep(10 * std.time.ns_per_ms); + } } // Handle exactly one TCP connection. @@ -229,7 +301,7 @@ pub const Client = struct { self.http.deinit(); } - fn run(self: *Client) void { + fn start(self: *Client) void { const http = self.http; http.cdp_client = .{ .socket = self.socket, @@ -245,6 +317,10 @@ pub const Client = struct { }; } + fn stop(self: *Client) void { + posix.shutdown(self.socket, .recv) catch {}; + } + fn httpLoop(self: *Client, http: *HttpClient) !void { lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); while (true) { diff --git a/src/TestHTTPServer.zig b/src/TestHTTPServer.zig index 9750a396..e048acaa 100644 --- a/src/TestHTTPServer.zig +++ b/src/TestHTTPServer.zig @@ -20,7 +20,7 @@ const std = @import("std"); const TestHTTPServer = @This(); -shutdown: bool, +shutdown: std.atomic.Value(bool), listener: ?std.net.Server, handler: Handler, @@ -28,33 +28,44 @@ const Handler = *const fn (req: *std.http.Server.Request) anyerror!void; pub fn init(handler: Handler) TestHTTPServer { return .{ - .shutdown = true, + .shutdown = .init(false), .listener = null, .handler = handler, }; } -pub fn deinit(self: *TestHTTPServer) void { - self.shutdown = true; +pub fn stop(self: *TestHTTPServer) void { + self.shutdown.store(true, .release); if (self.listener) |*listener| { - listener.deinit(); + // Close the socket to unblock accept(), but don't call deinit() + // which does memset and causes a data race with the running thread. + std.posix.close(listener.stream.handle); } } +pub fn deinit(self: *TestHTTPServer) void { + self.listener = null; +} + pub fn run(self: *TestHTTPServer, wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9582); self.listener = try address.listen(.{ .reuse_address = true }); var listener = &self.listener.?; + // Make listener nonblocking so accept() doesn't block indefinitely + _ = try std.posix.fcntl(listener.stream.handle, std.posix.F.SETFL, @as(u32, @bitCast(std.posix.O{ .NONBLOCK = true }))); + wg.finish(); - while (true) { + while (!self.shutdown.load(.acquire)) { const conn = listener.accept() catch |err| { - if (self.shutdown) { - return; + if (err == error.WouldBlock) { + std.Thread.sleep(10 * std.time.ns_per_ms); + continue; } - return err; + // Socket was closed in stop() + return; }; const thrd = try std.Thread.spawn(.{}, handleConnection, .{ self, conn }); thrd.detach(); diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig deleted file mode 100644 index 55642948..00000000 --- a/src/ThreadPool.zig +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright (C) 2023-2025 Lightpanda (Selecy SAS) -// -// Francis Bouvier -// Pierre Tachoire -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -const std = @import("std"); -const Allocator = std.mem.Allocator; - -const log = @import("log.zig"); - -const ThreadPool = @This(); - -allocator: Allocator, - -active: u16, -shutdown: bool, -max_threads: u16, - -lock: std.Thread.RwLock, -threads: std.DoublyLinkedList, - -const Func = struct { - ptr: *const fn (*anyopaque) void, - args: []u8, - alignment: std.mem.Alignment, - - fn init(allocator: Allocator, func: anytype, args: anytype) !Func { - const Args = @TypeOf(args); - const Wrapper = struct { - fn call(ctx: *anyopaque) void { - const a: *Args = @ptrCast(@alignCast(ctx)); - @call(.auto, func, a.*); - } - }; - - const alignment: std.mem.Alignment = .of(Args); - const size = @sizeOf(Args); - - if (size == 0) { - return .{ - .ptr = Wrapper.call, - .args = &.{}, - .alignment = alignment, - }; - } - - const args_buf = try allocator.alignedAlloc(u8, alignment, size); - - const bytes: []const u8 = @ptrCast((&args)[0..1]); - @memcpy(args_buf, bytes); - - return .{ - .ptr = Wrapper.call, - .args = args_buf, - .alignment = alignment, - }; - } - - fn call(self: Func) void { - self.ptr(@ptrCast(self.args.ptr)); - } - - fn free(self: Func, allocator: Allocator) void { - if (self.args.len > 0) { - allocator.rawFree(self.args, self.alignment, @returnAddress()); - } - } -}; - -const Worker = struct { - run_fn: Func, - shutdown_fn: Func, - pool: *ThreadPool, - thread: std.Thread, - node: std.DoublyLinkedList.Node, - - fn run(self: *Worker) void { - self.run_fn.call(); - self.deinit(); - } - - fn deinit(self: *Worker) void { - const pool = self.pool; - - pool.lock.lock(); - pool.threads.remove(&self.node); - pool.active -= 1; - pool.lock.unlock(); - - self.run_fn.free(pool.allocator); - self.shutdown_fn.free(pool.allocator); - pool.allocator.destroy(self); - } - - fn callShutdown(self: *Worker) void { - self.shutdown_fn.call(); - } -}; - -pub fn init(allocator: Allocator, max_threads: u16) ThreadPool { - return .{ - .allocator = allocator, - .max_threads = max_threads, - .active = 0, - .shutdown = false, - .threads = .{}, - .lock = .{}, - }; -} - -pub fn deinit(self: *ThreadPool) void { - self.join(); -} - -/// Spawn a thread to run run_func(run_args). shutdown_func is called during join(). -pub fn spawn( - self: *ThreadPool, - run_func: anytype, - run_args: std.meta.ArgsTuple(@TypeOf(run_func)), - shutdown_func: anytype, - shutdown_args: std.meta.ArgsTuple(@TypeOf(shutdown_func)), -) !void { - const run_fn = try Func.init(self.allocator, run_func, run_args); - errdefer run_fn.free(self.allocator); - - const shutdown_fn = try Func.init(self.allocator, shutdown_func, shutdown_args); - errdefer shutdown_fn.free(self.allocator); - - const worker = try self.allocator.create(Worker); - errdefer self.allocator.destroy(worker); - - worker.* = .{ - .run_fn = run_fn, - .shutdown_fn = shutdown_fn, - .pool = self, - .thread = undefined, - .node = .{}, - }; - - self.lock.lock(); - defer self.lock.unlock(); - - if (self.shutdown) { - return error.PoolShuttingDown; - } - - if (self.active >= self.max_threads) { - return error.MaxThreadsReached; - } - - self.threads.append(&worker.node); - self.active += 1; - - worker.thread = std.Thread.spawn(.{}, Worker.run, .{worker}) catch |err| { - self.threads.remove(&worker.node); - self.active -= 1; - return err; - }; -} - -/// Number of active threads. -pub fn count(self: *ThreadPool) u16 { - self.lock.lockShared(); - defer self.lock.unlockShared(); - return self.active; -} - -/// Wait for all threads to finish. -pub fn join(self: *ThreadPool) void { - self.lock.lock(); - self.shutdown = true; - - // Call shutdown on all active workers - var node = self.threads.first; - while (node) |n| { - const worker: *Worker = @fieldParentPtr("node", n); - worker.callShutdown(); - node = n.next; - } - self.lock.unlock(); - - while (true) { - self.lock.lockShared(); - const active = self.active; - self.lock.unlockShared(); - - if (active == 0) break; - std.Thread.sleep(10 * std.time.ns_per_ms); - } -} - -pub fn isShuttingDown(self: *ThreadPool) bool { - self.lock.lockShared(); - defer self.lock.unlockShared(); - return self.shutdown; -} - -// Tests -const testing = std.testing; - -fn noop() void {} - -fn increment(counter: *std.atomic.Value(u32)) void { - _ = counter.fetchAdd(1, .acq_rel); -} - -fn block(flag: *std.atomic.Value(bool)) void { - while (!flag.load(.acquire)) { - std.Thread.sleep(1 * std.time.ns_per_ms); - } -} - -fn unblock(flag: *std.atomic.Value(bool)) void { - flag.store(true, .release); -} - -test "ThreadPool: spawn and join" { - var counter = std.atomic.Value(u32).init(0); - var pool = ThreadPool.init(testing.allocator, 4); - defer pool.deinit(); - - try pool.spawn(increment, .{&counter}, noop, .{}); - try pool.spawn(increment, .{&counter}, noop, .{}); - try pool.spawn(increment, .{&counter}, noop, .{}); - - pool.join(); - - try testing.expectEqual(@as(u32, 3), counter.load(.acquire)); - try testing.expectEqual(@as(u16, 0), pool.count()); -} - -test "ThreadPool: max threads limit" { - var flag = std.atomic.Value(bool).init(false); - var pool = ThreadPool.init(testing.allocator, 2); - defer pool.deinit(); - - try pool.spawn(block, .{&flag}, unblock, .{&flag}); - try pool.spawn(block, .{&flag}, unblock, .{&flag}); - - try testing.expectError(error.MaxThreadsReached, pool.spawn(block, .{&flag}, unblock, .{&flag})); - try testing.expectEqual(@as(u16, 2), pool.count()); - - // deinit will call unblock via shutdown callback -} diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index 69af00a5..76592f89 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -24,7 +24,6 @@ const ArenaAllocator = std.heap.ArenaAllocator; const js = @import("js/js.zig"); const log = @import("../log.zig"); const App = @import("../App.zig"); -const Http = @import("../http/Http.zig"); const HttpClient = @import("../http/Client.zig"); const ArenaPool = App.ArenaPool; @@ -40,8 +39,8 @@ const Session = @import("Session.zig"); const Browser = @This(); env: js.Env, -app: *App, http_client: *HttpClient, +user_agent: []const u8, session: ?Session, allocator: Allocator, arena_pool: *ArenaPool, @@ -61,9 +60,9 @@ pub fn init(allocator: Allocator, app: *App, http_client: *HttpClient) !Browser errdefer notification.deinit(); return .{ - .app = app, .env = env, .http_client = http_client, + .user_agent = app.http.user_agent, .session = null, .allocator = allocator, .notification = notification, diff --git a/src/browser/Page.zig b/src/browser/Page.zig index fc7f353a..69788de4 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -277,7 +277,6 @@ fn reset(self: *Page, comptime initializing: bool) !void { self._arena_pool_leak_track.clearRetainingCapacity(); } - // We force a garbage collection between page navigations to keep v8 // memory usage as low as possible. self._session.browser.env.memoryPressureNotification(.moderate); diff --git a/src/browser/webapi/Navigator.zig b/src/browser/webapi/Navigator.zig index c095ff65..5990f054 100644 --- a/src/browser/webapi/Navigator.zig +++ b/src/browser/webapi/Navigator.zig @@ -27,7 +27,7 @@ _pad: bool = false, pub const init: Navigator = .{}; pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 { - return page._session.browser.app.http.user_agent; + return page._session.browser.user_agent; } pub fn getAppName(_: *const Navigator) []const u8 { diff --git a/src/lightpanda.zig b/src/lightpanda.zig index aa91b023..4a58837c 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -20,7 +20,6 @@ const std = @import("std"); pub const App = @import("App.zig"); pub const Server = @import("Server.zig"); pub const Config = @import("Config.zig"); -pub const ThreadPool = @import("ThreadPool.zig"); pub const Page = @import("browser/Page.zig"); pub const Browser = @import("browser/Browser.zig"); pub const Session = @import("browser/Session.zig"); diff --git a/src/test_runner.zig b/src/test_runner.zig index 2461c82f..1c38cae1 100644 --- a/src/test_runner.zig +++ b/src/test_runner.zig @@ -442,6 +442,7 @@ pub const TrackingAllocator = struct { allocated_bytes: usize = 0, allocation_count: usize = 0, reallocation_count: usize = 0, + mutex: std.Thread.Mutex = .{}, const Stats = struct { allocated_bytes: usize, @@ -479,6 +480,8 @@ pub const TrackingAllocator = struct { return_address: usize, ) ?[*]u8 { const self: *TrackingAllocator = @ptrCast(@alignCast(ctx)); + self.mutex.lock(); + defer self.mutex.unlock(); const result = self.parent_allocator.rawAlloc(len, alignment, return_address); self.allocation_count += 1; self.allocated_bytes += len; @@ -493,6 +496,8 @@ pub const TrackingAllocator = struct { ra: usize, ) bool { const self: *TrackingAllocator = @ptrCast(@alignCast(ctx)); + self.mutex.lock(); + defer self.mutex.unlock(); const result = self.parent_allocator.rawResize(old_mem, alignment, new_len, ra); self.reallocation_count += 1; // TODO: only if result is not null? return result; @@ -505,6 +510,8 @@ pub const TrackingAllocator = struct { ra: usize, ) void { const self: *TrackingAllocator = @ptrCast(@alignCast(ctx)); + self.mutex.lock(); + defer self.mutex.unlock(); self.parent_allocator.rawFree(old_mem, alignment, ra); self.free_count += 1; } @@ -517,6 +524,8 @@ pub const TrackingAllocator = struct { ret_addr: usize, ) ?[*]u8 { const self: *TrackingAllocator = @ptrCast(@alignCast(ctx)); + self.mutex.lock(); + defer self.mutex.unlock(); const result = self.parent_allocator.rawRemap(memory, alignment, new_len, ret_addr); self.reallocation_count += 1; // TODO: only if result is not null? return result; diff --git a/src/testing.zig b/src/testing.zig index a8925dd5..689ae656 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -450,7 +450,9 @@ const TestHTTPServer = @import("TestHTTPServer.zig"); const Server = @import("Server.zig"); var test_cdp_server: ?Server = null; +var test_cdp_server_thread: ?std.Thread = null; var test_http_server: ?TestHTTPServer = null; +var test_http_server_thread: ?std.Thread = null; const test_config = Config{ .mode = .{ .serve = .{ @@ -480,16 +482,10 @@ test "tests:beforeAll" { var wg: std.Thread.WaitGroup = .{}; wg.startMany(2); - { - const thread = try std.Thread.spawn(.{}, serveCDP, .{&wg}); - thread.detach(); - } + test_cdp_server_thread = try std.Thread.spawn(.{}, serveCDP, .{&wg}); test_http_server = TestHTTPServer.init(testHTTPHandler); - { - const thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg }); - thread.detach(); - } + test_http_server_thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg }); // need to wait for the servers to be listening, else tests will fail because // they aren't able to connect. @@ -497,9 +493,21 @@ test "tests:beforeAll" { } test "tests:afterAll" { + if (test_cdp_server) |*server| { + server.stop(); + } + if (test_cdp_server_thread) |thread| { + thread.join(); + } if (test_cdp_server) |*server| { server.deinit(); } + if (test_http_server) |*server| { + server.stop(); + } + if (test_http_server_thread) |thread| { + thread.join(); + } if (test_http_server) |*server| { server.deinit(); } @@ -515,9 +523,6 @@ fn serveCDP(wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9583); const test_allocator = @import("root").tracking_allocator; test_cdp_server = try Server.init(test_allocator, test_app, address); - - var server = try Server.init(test_allocator, test_app, address); - defer server.deinit(); wg.finish(); test_cdp_server.?.run(address, 5) catch |err| {