Cleanup threads manager

This commit is contained in:
Nikolay Govorov
2026-01-29 08:39:32 +00:00
parent c2ba6851fa
commit 85742dd7eb
12 changed files with 152 additions and 309 deletions

View File

@@ -35,7 +35,7 @@ pub fn build(b: *Build) !void {
opts.addOption([]const u8, "git_commit", git_commit orelse "dev");
opts.addOption(?[]const u8, "snapshot_path", snapshot_path);
const enable_tsan = b.option(bool, "tsan", "Enable Thread Sanitizer");
const enable_tsan = b.option(bool, "tsan", "Enable Thread Sanitizer") orelse false;
const enable_csan = b.option(std.zig.SanitizeC, "csan", "Enable C Sanitizers");
const lightpanda_module = blk: {
@@ -50,7 +50,7 @@ pub fn build(b: *Build) !void {
});
mod.addImport("lightpanda", mod); // allow circular "lightpanda" import
try addDependencies(b, mod, opts, prebuilt_v8_path);
try addDependencies(b, mod, opts, prebuilt_v8_path, enable_tsan);
break :blk mod;
};
@@ -170,7 +170,7 @@ pub fn build(b: *Build) !void {
}
}
fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, prebuilt_v8_path: ?[]const u8) !void {
fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, prebuilt_v8_path: ?[]const u8, is_tsan: bool) !void {
mod.addImport("build_config", opts.createModule());
const target = mod.resolved_target.?;
@@ -179,6 +179,8 @@ fn addDependencies(b: *Build, mod: *Build.Module, opts: *Build.Step.Options, pre
.optimize = mod.optimize.?,
.prebuilt_v8_path = prebuilt_v8_path,
.cache_root = b.pathFromRoot(".lp-cache"),
.is_tsan = is_tsan,
.v8_enable_sandbox = is_tsan, // v8 contains a bug and cannot be compiled with tsan without a sandbox.
};
mod.addIncludePath(b.path("vendor/lightpanda"));

View File

@@ -5,11 +5,11 @@
.fingerprint = 0xda130f3af836cea0, // Changing this has security and trust implications.
.minimum_zig_version = "0.15.2",
.dependencies = .{
.v8 = .{
.url = "https://github.com/lightpanda-io/zig-v8-fork/archive/v0.2.6.tar.gz",
.hash = "v8-0.0.0-xddH60NRBAAWmpZq9nWdfFAEqVJ9zqJnvr1Nl9m2AbcY",
},
//.v8 = .{ .path = "../zig-v8-fork" },
// .v8 = .{
// .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/v0.2.6.tar.gz",
// .hash = "v8-0.0.0-xddH60NRBAAWmpZq9nWdfFAEqVJ9zqJnvr1Nl9m2AbcY",
// },
.v8 = .{ .path = "../zig-v8-fork" },
.@"boringssl-zig" = .{
.url = "git+https://github.com/Syndica/boringssl-zig.git#c53df00d06b02b755ad88bbf4d1202ed9687b096",
.hash = "boringssl-0.1.0-VtJeWehMAAA4RNnwRnzEvKcS9rjsR1QVRw1uJrwXxmVK",

View File

@@ -29,7 +29,7 @@ free_list_len: u16 = 0,
free_list: ?*Entry = null,
free_list_max: u16,
entry_pool: std.heap.MemoryPool(Entry),
mutex: std.Thread.Mutex = .{},
mutex: std.Thread.Mutex,
const Entry = struct {
next: ?*Entry,

View File

@@ -32,16 +32,20 @@ const Config = @import("Config.zig");
const CDP = @import("cdp/cdp.zig").CDP;
const Http = @import("http/Http.zig");
const HttpClient = @import("http/Client.zig");
const ThreadPool = @import("ThreadPool.zig");
const Server = @This();
app: *App,
shutdown: bool = false,
shutdown: std.atomic.Value(bool) = .init(false),
allocator: Allocator,
listener: ?posix.socket_t,
json_version_response: []const u8,
thread_pool: ThreadPool,
// Thread management
active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayListUnmanaged(*Client) = .{},
clients_mu: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server {
const json_version_response = try buildJSONVersionResponse(allocator, address);
@@ -52,16 +56,23 @@ pub fn init(allocator: Allocator, app: *App, address: net.Address) !Server {
.listener = null,
.allocator = allocator,
.json_version_response = json_version_response,
.thread_pool = ThreadPool.init(allocator, app.config.maxConnections()),
.clients_pool = std.heap.MemoryPool(Client).init(allocator),
};
}
/// Interrupts the server so that main can complete normally and call all defer handlers.
pub fn stop(self: *Server) void {
if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) {
if (self.shutdown.swap(true, .release)) {
return;
}
// Shutdown all active clients
self.clients_mu.lock();
for (self.clients.items) |client| {
client.stop();
}
self.clients_mu.unlock();
// Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF).
@@ -78,16 +89,18 @@ pub fn stop(self: *Server) void {
}
pub fn deinit(self: *Server) void {
self.thread_pool.deinit();
self.joinThreads();
if (self.listener) |listener| {
posix.close(listener);
self.listener = null;
}
self.clients.deinit(self.allocator);
self.clients_pool.deinit();
self.allocator.free(self.json_version_response);
}
pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener;
@@ -100,13 +113,17 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
try posix.listen(listener, 1);
log.info(.app, "server running", .{ .address = address });
while (!@atomicLoad(bool, &self.shutdown, .monotonic)) {
while (!self.shutdown.load(.acquire)) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) {
error.SocketNotListening, error.ConnectionAborted => {
log.info(.app, "server stopped", .{});
break;
},
error.WouldBlock => {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
},
else => {
log.err(.app, "CDP accept", .{ .err = err });
std.Thread.sleep(std.time.ns_per_s);
@@ -115,27 +132,23 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
}
};
self.thread_pool.spawn(handleConnection, .{ self, socket, timeout_ms }, shutdownConnection, .{socket}) catch |err| {
self.spawnWorker(socket, timeout_ms) catch |err| {
log.err(.app, "CDP spawn", .{ .err = err });
posix.close(socket);
};
}
}
fn shutdownConnection(socket: posix.socket_t) void {
posix.shutdown(socket, .recv) catch {};
}
fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer posix.close(socket);
// Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.allocator.create(Client) catch |err| {
const client = self.getClient() catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer self.allocator.destroy(client);
defer self.releaseClient(client);
client.* = Client.init(
socket,
@@ -149,7 +162,66 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void
};
defer client.deinit();
client.run();
self.registerClient(client);
defer self.unregisterClient(client);
client.start();
}
fn getClient(self: *Server) !*Client {
self.clients_mu.lock();
defer self.clients_mu.unlock();
return self.clients_pool.create();
}
fn releaseClient(self: *Server, client: *Client) void {
self.clients_mu.lock();
defer self.clients_mu.unlock();
self.clients_pool.destroy(client);
}
fn registerClient(self: *Server, client: *Client) void {
self.clients_mu.lock();
defer self.clients_mu.unlock();
self.clients.append(self.allocator, client) catch {};
}
fn unregisterClient(self: *Server, client: *Client) void {
self.clients_mu.lock();
defer self.clients_mu.unlock();
for (self.clients.items, 0..) |c, i| {
if (c == client) {
_ = self.clients.swapRemove(i);
break;
}
}
}
fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
if (self.active_threads.load(.monotonic) >= self.app.config.maxConnections()) {
return error.MaxThreadsReached;
}
_ = self.active_threads.fetchAdd(1, .monotonic);
errdefer _ = self.active_threads.fetchSub(1, .monotonic);
const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms });
thread.detach();
}
fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
handleConnection(self, socket, timeout_ms);
}
fn joinThreads(self: *Server) void {
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
// Handle exactly one TCP connection.
@@ -229,7 +301,7 @@ pub const Client = struct {
self.http.deinit();
}
fn run(self: *Client) void {
fn start(self: *Client) void {
const http = self.http;
http.cdp_client = .{
.socket = self.socket,
@@ -245,6 +317,10 @@ pub const Client = struct {
};
}
fn stop(self: *Client) void {
posix.shutdown(self.socket, .recv) catch {};
}
fn httpLoop(self: *Client, http: *HttpClient) !void {
lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{});
while (true) {

View File

@@ -20,7 +20,7 @@ const std = @import("std");
const TestHTTPServer = @This();
shutdown: bool,
shutdown: std.atomic.Value(bool),
listener: ?std.net.Server,
handler: Handler,
@@ -28,33 +28,44 @@ const Handler = *const fn (req: *std.http.Server.Request) anyerror!void;
pub fn init(handler: Handler) TestHTTPServer {
return .{
.shutdown = true,
.shutdown = .init(false),
.listener = null,
.handler = handler,
};
}
pub fn deinit(self: *TestHTTPServer) void {
self.shutdown = true;
pub fn stop(self: *TestHTTPServer) void {
self.shutdown.store(true, .release);
if (self.listener) |*listener| {
listener.deinit();
// Close the socket to unblock accept(), but don't call deinit()
// which does memset and causes a data race with the running thread.
std.posix.close(listener.stream.handle);
}
}
pub fn deinit(self: *TestHTTPServer) void {
self.listener = null;
}
pub fn run(self: *TestHTTPServer, wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9582);
self.listener = try address.listen(.{ .reuse_address = true });
var listener = &self.listener.?;
// Make listener nonblocking so accept() doesn't block indefinitely
_ = try std.posix.fcntl(listener.stream.handle, std.posix.F.SETFL, @as(u32, @bitCast(std.posix.O{ .NONBLOCK = true })));
wg.finish();
while (true) {
while (!self.shutdown.load(.acquire)) {
const conn = listener.accept() catch |err| {
if (self.shutdown) {
return;
if (err == error.WouldBlock) {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
}
return err;
// Socket was closed in stop()
return;
};
const thrd = try std.Thread.spawn(.{}, handleConnection, .{ self, conn });
thrd.detach();

View File

@@ -1,257 +0,0 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const Allocator = std.mem.Allocator;
const log = @import("log.zig");
const ThreadPool = @This();
allocator: Allocator,
active: u16,
shutdown: bool,
max_threads: u16,
lock: std.Thread.RwLock,
threads: std.DoublyLinkedList,
const Func = struct {
ptr: *const fn (*anyopaque) void,
args: []u8,
alignment: std.mem.Alignment,
fn init(allocator: Allocator, func: anytype, args: anytype) !Func {
const Args = @TypeOf(args);
const Wrapper = struct {
fn call(ctx: *anyopaque) void {
const a: *Args = @ptrCast(@alignCast(ctx));
@call(.auto, func, a.*);
}
};
const alignment: std.mem.Alignment = .of(Args);
const size = @sizeOf(Args);
if (size == 0) {
return .{
.ptr = Wrapper.call,
.args = &.{},
.alignment = alignment,
};
}
const args_buf = try allocator.alignedAlloc(u8, alignment, size);
const bytes: []const u8 = @ptrCast((&args)[0..1]);
@memcpy(args_buf, bytes);
return .{
.ptr = Wrapper.call,
.args = args_buf,
.alignment = alignment,
};
}
fn call(self: Func) void {
self.ptr(@ptrCast(self.args.ptr));
}
fn free(self: Func, allocator: Allocator) void {
if (self.args.len > 0) {
allocator.rawFree(self.args, self.alignment, @returnAddress());
}
}
};
const Worker = struct {
run_fn: Func,
shutdown_fn: Func,
pool: *ThreadPool,
thread: std.Thread,
node: std.DoublyLinkedList.Node,
fn run(self: *Worker) void {
self.run_fn.call();
self.deinit();
}
fn deinit(self: *Worker) void {
const pool = self.pool;
pool.lock.lock();
pool.threads.remove(&self.node);
pool.active -= 1;
pool.lock.unlock();
self.run_fn.free(pool.allocator);
self.shutdown_fn.free(pool.allocator);
pool.allocator.destroy(self);
}
fn callShutdown(self: *Worker) void {
self.shutdown_fn.call();
}
};
pub fn init(allocator: Allocator, max_threads: u16) ThreadPool {
return .{
.allocator = allocator,
.max_threads = max_threads,
.active = 0,
.shutdown = false,
.threads = .{},
.lock = .{},
};
}
pub fn deinit(self: *ThreadPool) void {
self.join();
}
/// Spawn a thread to run run_func(run_args). shutdown_func is called during join().
pub fn spawn(
self: *ThreadPool,
run_func: anytype,
run_args: std.meta.ArgsTuple(@TypeOf(run_func)),
shutdown_func: anytype,
shutdown_args: std.meta.ArgsTuple(@TypeOf(shutdown_func)),
) !void {
const run_fn = try Func.init(self.allocator, run_func, run_args);
errdefer run_fn.free(self.allocator);
const shutdown_fn = try Func.init(self.allocator, shutdown_func, shutdown_args);
errdefer shutdown_fn.free(self.allocator);
const worker = try self.allocator.create(Worker);
errdefer self.allocator.destroy(worker);
worker.* = .{
.run_fn = run_fn,
.shutdown_fn = shutdown_fn,
.pool = self,
.thread = undefined,
.node = .{},
};
self.lock.lock();
defer self.lock.unlock();
if (self.shutdown) {
return error.PoolShuttingDown;
}
if (self.active >= self.max_threads) {
return error.MaxThreadsReached;
}
self.threads.append(&worker.node);
self.active += 1;
worker.thread = std.Thread.spawn(.{}, Worker.run, .{worker}) catch |err| {
self.threads.remove(&worker.node);
self.active -= 1;
return err;
};
}
/// Number of active threads.
pub fn count(self: *ThreadPool) u16 {
self.lock.lockShared();
defer self.lock.unlockShared();
return self.active;
}
/// Wait for all threads to finish.
pub fn join(self: *ThreadPool) void {
self.lock.lock();
self.shutdown = true;
// Call shutdown on all active workers
var node = self.threads.first;
while (node) |n| {
const worker: *Worker = @fieldParentPtr("node", n);
worker.callShutdown();
node = n.next;
}
self.lock.unlock();
while (true) {
self.lock.lockShared();
const active = self.active;
self.lock.unlockShared();
if (active == 0) break;
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
pub fn isShuttingDown(self: *ThreadPool) bool {
self.lock.lockShared();
defer self.lock.unlockShared();
return self.shutdown;
}
// Tests
const testing = std.testing;
fn noop() void {}
fn increment(counter: *std.atomic.Value(u32)) void {
_ = counter.fetchAdd(1, .acq_rel);
}
fn block(flag: *std.atomic.Value(bool)) void {
while (!flag.load(.acquire)) {
std.Thread.sleep(1 * std.time.ns_per_ms);
}
}
fn unblock(flag: *std.atomic.Value(bool)) void {
flag.store(true, .release);
}
test "ThreadPool: spawn and join" {
var counter = std.atomic.Value(u32).init(0);
var pool = ThreadPool.init(testing.allocator, 4);
defer pool.deinit();
try pool.spawn(increment, .{&counter}, noop, .{});
try pool.spawn(increment, .{&counter}, noop, .{});
try pool.spawn(increment, .{&counter}, noop, .{});
pool.join();
try testing.expectEqual(@as(u32, 3), counter.load(.acquire));
try testing.expectEqual(@as(u16, 0), pool.count());
}
test "ThreadPool: max threads limit" {
var flag = std.atomic.Value(bool).init(false);
var pool = ThreadPool.init(testing.allocator, 2);
defer pool.deinit();
try pool.spawn(block, .{&flag}, unblock, .{&flag});
try pool.spawn(block, .{&flag}, unblock, .{&flag});
try testing.expectError(error.MaxThreadsReached, pool.spawn(block, .{&flag}, unblock, .{&flag}));
try testing.expectEqual(@as(u16, 2), pool.count());
// deinit will call unblock via shutdown callback
}

View File

@@ -24,7 +24,6 @@ const ArenaAllocator = std.heap.ArenaAllocator;
const js = @import("js/js.zig");
const log = @import("../log.zig");
const App = @import("../App.zig");
const Http = @import("../http/Http.zig");
const HttpClient = @import("../http/Client.zig");
const ArenaPool = App.ArenaPool;
@@ -40,8 +39,8 @@ const Session = @import("Session.zig");
const Browser = @This();
env: js.Env,
app: *App,
http_client: *HttpClient,
user_agent: []const u8,
session: ?Session,
allocator: Allocator,
arena_pool: *ArenaPool,
@@ -61,9 +60,9 @@ pub fn init(allocator: Allocator, app: *App, http_client: *HttpClient) !Browser
errdefer notification.deinit();
return .{
.app = app,
.env = env,
.http_client = http_client,
.user_agent = app.http.user_agent,
.session = null,
.allocator = allocator,
.notification = notification,

View File

@@ -277,7 +277,6 @@ fn reset(self: *Page, comptime initializing: bool) !void {
self._arena_pool_leak_track.clearRetainingCapacity();
}
// We force a garbage collection between page navigations to keep v8
// memory usage as low as possible.
self._session.browser.env.memoryPressureNotification(.moderate);

View File

@@ -27,7 +27,7 @@ _pad: bool = false,
pub const init: Navigator = .{};
pub fn getUserAgent(_: *const Navigator, page: *Page) []const u8 {
return page._session.browser.app.http.user_agent;
return page._session.browser.user_agent;
}
pub fn getAppName(_: *const Navigator) []const u8 {

View File

@@ -20,7 +20,6 @@ const std = @import("std");
pub const App = @import("App.zig");
pub const Server = @import("Server.zig");
pub const Config = @import("Config.zig");
pub const ThreadPool = @import("ThreadPool.zig");
pub const Page = @import("browser/Page.zig");
pub const Browser = @import("browser/Browser.zig");
pub const Session = @import("browser/Session.zig");

View File

@@ -442,6 +442,7 @@ pub const TrackingAllocator = struct {
allocated_bytes: usize = 0,
allocation_count: usize = 0,
reallocation_count: usize = 0,
mutex: std.Thread.Mutex = .{},
const Stats = struct {
allocated_bytes: usize,
@@ -479,6 +480,8 @@ pub const TrackingAllocator = struct {
return_address: usize,
) ?[*]u8 {
const self: *TrackingAllocator = @ptrCast(@alignCast(ctx));
self.mutex.lock();
defer self.mutex.unlock();
const result = self.parent_allocator.rawAlloc(len, alignment, return_address);
self.allocation_count += 1;
self.allocated_bytes += len;
@@ -493,6 +496,8 @@ pub const TrackingAllocator = struct {
ra: usize,
) bool {
const self: *TrackingAllocator = @ptrCast(@alignCast(ctx));
self.mutex.lock();
defer self.mutex.unlock();
const result = self.parent_allocator.rawResize(old_mem, alignment, new_len, ra);
self.reallocation_count += 1; // TODO: only if result is not null?
return result;
@@ -505,6 +510,8 @@ pub const TrackingAllocator = struct {
ra: usize,
) void {
const self: *TrackingAllocator = @ptrCast(@alignCast(ctx));
self.mutex.lock();
defer self.mutex.unlock();
self.parent_allocator.rawFree(old_mem, alignment, ra);
self.free_count += 1;
}
@@ -517,6 +524,8 @@ pub const TrackingAllocator = struct {
ret_addr: usize,
) ?[*]u8 {
const self: *TrackingAllocator = @ptrCast(@alignCast(ctx));
self.mutex.lock();
defer self.mutex.unlock();
const result = self.parent_allocator.rawRemap(memory, alignment, new_len, ret_addr);
self.reallocation_count += 1; // TODO: only if result is not null?
return result;

View File

@@ -450,7 +450,9 @@ const TestHTTPServer = @import("TestHTTPServer.zig");
const Server = @import("Server.zig");
var test_cdp_server: ?Server = null;
var test_cdp_server_thread: ?std.Thread = null;
var test_http_server: ?TestHTTPServer = null;
var test_http_server_thread: ?std.Thread = null;
const test_config = Config{
.mode = .{ .serve = .{
@@ -480,16 +482,10 @@ test "tests:beforeAll" {
var wg: std.Thread.WaitGroup = .{};
wg.startMany(2);
{
const thread = try std.Thread.spawn(.{}, serveCDP, .{&wg});
thread.detach();
}
test_cdp_server_thread = try std.Thread.spawn(.{}, serveCDP, .{&wg});
test_http_server = TestHTTPServer.init(testHTTPHandler);
{
const thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg });
thread.detach();
}
test_http_server_thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg });
// need to wait for the servers to be listening, else tests will fail because
// they aren't able to connect.
@@ -497,9 +493,21 @@ test "tests:beforeAll" {
}
test "tests:afterAll" {
if (test_cdp_server) |*server| {
server.stop();
}
if (test_cdp_server_thread) |thread| {
thread.join();
}
if (test_cdp_server) |*server| {
server.deinit();
}
if (test_http_server) |*server| {
server.stop();
}
if (test_http_server_thread) |thread| {
thread.join();
}
if (test_http_server) |*server| {
server.deinit();
}
@@ -515,9 +523,6 @@ fn serveCDP(wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9583);
const test_allocator = @import("root").tracking_allocator;
test_cdp_server = try Server.init(test_allocator, test_app, address);
var server = try Server.init(test_allocator, test_app, address);
defer server.deinit();
wg.finish();
test_cdp_server.?.run(address, 5) catch |err| {