From 4f21d8d7a8ae1d28660fc202297ca62f5d18145d Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 17 Feb 2026 15:47:02 +0000 Subject: [PATCH] Implement multi-cdp architecture --- src/Server.zig | 120 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 112 insertions(+), 8 deletions(-) diff --git a/src/Server.zig b/src/Server.zig index 5b4a59a4..9f060a1c 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -41,6 +41,12 @@ allocator: Allocator, listener: ?posix.socket_t, json_version_response: []const u8, +// Thread management +active_threads: std.atomic.Value(u32) = .init(0), +clients: std.ArrayList(*Client) = .{}, +client_mutex: std.Thread.Mutex = .{}, +clients_pool: std.heap.MemoryPool(Client), + pub fn init(app: *App, address: net.Address) !Server { const allocator = app.allocator; const json_version_response = try buildJSONVersionResponse(allocator, address); @@ -51,6 +57,7 @@ pub fn init(app: *App, address: net.Address) !Server { .listener = null, .allocator = allocator, .json_version_response = json_version_response, + .clients_pool = std.heap.MemoryPool(Client).init(app.allocator), }; } @@ -60,6 +67,15 @@ pub fn stop(self: *Server) void { return; } + // Shutdown all active clients + { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + for (self.clients.items) |client| { + client.stop(); + } + } + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). @@ -80,15 +96,18 @@ pub fn deinit(self: *Server) void { self.stop(); } + self.joinThreads(); if (self.listener) |listener| { posix.close(listener); self.listener = null; } + self.clients.deinit(self.allocator); + self.clients_pool.deinit(); self.allocator.free(self.json_version_response); } pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); self.listener = listener; @@ -98,7 +117,7 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { } try posix.bind(listener, &address.any, address.getOsSockLen()); - try posix.listen(listener, 1); + try posix.listen(listener, self.app.config.maxPendingConnections()); log.info(.app, "server running", .{ .address = address }); while (!self.shutdown.load(.acquire)) { @@ -108,6 +127,10 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { log.info(.app, "server stopped", .{}); break; }, + error.WouldBlock => { + std.Thread.sleep(10 * std.time.ns_per_ms); + continue; + }, else => { log.err(.app, "CDP accept", .{ .err = err }); std.Thread.sleep(std.time.ns_per_s); @@ -115,19 +138,26 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { }, } }; - if (self.shutdown.load(.acquire)) { - return error.ShuttingDown; - } - const thread = try std.Thread.spawn(.{}, handleConnection, .{ self, socket, timeout_ms }); - thread.join(); + self.spawnWorker(socket, timeout_ms) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); + }; } } fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { defer posix.close(socket); - var client = Client.init( + // Client is HUGE (> 512KB) because it has a large read buffer. + // V8 crashes if this is on the stack (likely related to its size). + const client = self.getClient() catch |err| { + log.err(.app, "CDP client create", .{ .err = err }); + return; + }; + defer self.releaseClient(client); + + client.* = Client.init( socket, self.allocator, self.app, @@ -139,6 +169,9 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void }; defer client.deinit(); + self.registerClient(client); + defer self.unregisterClient(client); + // Check shutdown after registering to avoid missing stop() signal. // If stop() already iterated over clients, this client won't receive stop() // and would block joinThreads() indefinitely. @@ -149,6 +182,77 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void client.start(); } +fn getClient(self: *Server) !*Client { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + return self.clients_pool.create(); +} + +fn releaseClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + self.clients_pool.destroy(client); +} + +fn registerClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + self.clients.append(self.allocator, client) catch {}; +} + +fn unregisterClient(self: *Server, client: *Client) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + for (self.clients.items, 0..) |c, i| { + if (c == client) { + _ = self.clients.swapRemove(i); + break; + } + } +} + +fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { + if (self.shutdown.load(.acquire)) { + return error.ShuttingDown; + } + + // Atomically increment active_threads only if below max_connections. + // Uses CAS loop to avoid race between checking the limit and incrementing. + // + // cmpxchgWeak may fail for two reasons: + // 1. Another thread changed the value (increment or decrement) + // 2. Spurious failure on some architectures (e.g. ARM) + // + // We use Weak instead of Strong because we need a retry loop anyway: + // if CAS fails because a thread finished (counter decreased), we should + // retry rather than return an error - there may now be room for a new connection. + // + // On failure, cmpxchgWeak returns the actual value, which we reuse to avoid + // an extra load on the next iteration. + const max_connections = self.app.config.maxConnections(); + var current = self.active_threads.load(.monotonic); + while (current < max_connections) { + current = self.active_threads.cmpxchgWeak(current, current + 1, .monotonic, .monotonic) orelse break; + } else { + return error.MaxThreadsReached; + } + errdefer _ = self.active_threads.fetchSub(1, .monotonic); + + const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms }); + thread.detach(); +} + +fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { + defer _ = self.active_threads.fetchSub(1, .monotonic); + handleConnection(self, socket, timeout_ms); +} + +fn joinThreads(self: *Server) void { + while (self.active_threads.load(.monotonic) > 0) { + std.Thread.sleep(10 * std.time.ns_per_ms); + } +} + // Handle exactly one TCP connection. pub const Client = struct { // The client is initially serving HTTP requests but, under normal circumstances