Implement multi-cdp architecture

This commit is contained in:
Nikolay Govorov
2026-02-17 15:47:02 +00:00
parent 424deb8faf
commit 4f21d8d7a8

View File

@@ -41,6 +41,12 @@ allocator: Allocator,
listener: ?posix.socket_t, listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
// Thread management
active_threads: std.atomic.Value(u32) = .init(0),
clients: std.ArrayList(*Client) = .{},
client_mutex: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client),
pub fn init(app: *App, address: net.Address) !Server { pub fn init(app: *App, address: net.Address) !Server {
const allocator = app.allocator; const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address); const json_version_response = try buildJSONVersionResponse(allocator, address);
@@ -51,6 +57,7 @@ pub fn init(app: *App, address: net.Address) !Server {
.listener = null, .listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
.clients_pool = std.heap.MemoryPool(Client).init(app.allocator),
}; };
} }
@@ -60,6 +67,15 @@ pub fn stop(self: *Server) void {
return; return;
} }
// Shutdown all active clients
{
self.client_mutex.lock();
defer self.client_mutex.unlock();
for (self.clients.items) |client| {
client.stop();
}
}
// Linux and BSD/macOS handle canceling a socket blocked on accept differently. // Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF).
@@ -80,15 +96,18 @@ pub fn deinit(self: *Server) void {
self.stop(); self.stop();
} }
self.joinThreads();
if (self.listener) |listener| { if (self.listener) |listener| {
posix.close(listener); posix.close(listener);
self.listener = null; self.listener = null;
} }
self.clients.deinit(self.allocator);
self.clients_pool.deinit();
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
} }
pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC; const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
self.listener = listener; self.listener = listener;
@@ -98,7 +117,7 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
} }
try posix.bind(listener, &address.any, address.getOsSockLen()); try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, 1); try posix.listen(listener, self.app.config.maxPendingConnections());
log.info(.app, "server running", .{ .address = address }); log.info(.app, "server running", .{ .address = address });
while (!self.shutdown.load(.acquire)) { while (!self.shutdown.load(.acquire)) {
@@ -108,6 +127,10 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
log.info(.app, "server stopped", .{}); log.info(.app, "server stopped", .{});
break; break;
}, },
error.WouldBlock => {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
},
else => { else => {
log.err(.app, "CDP accept", .{ .err = err }); log.err(.app, "CDP accept", .{ .err = err });
std.Thread.sleep(std.time.ns_per_s); std.Thread.sleep(std.time.ns_per_s);
@@ -115,19 +138,26 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void {
}, },
} }
}; };
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
const thread = try std.Thread.spawn(.{}, handleConnection, .{ self, socket, timeout_ms }); self.spawnWorker(socket, timeout_ms) catch |err| {
thread.join(); log.err(.app, "CDP spawn", .{ .err = err });
posix.close(socket);
};
} }
} }
fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer posix.close(socket); defer posix.close(socket);
var client = Client.init( // Client is HUGE (> 512KB) because it has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const client = self.getClient() catch |err| {
log.err(.app, "CDP client create", .{ .err = err });
return;
};
defer self.releaseClient(client);
client.* = Client.init(
socket, socket,
self.allocator, self.allocator,
self.app, self.app,
@@ -139,6 +169,9 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void
}; };
defer client.deinit(); defer client.deinit();
self.registerClient(client);
defer self.unregisterClient(client);
// Check shutdown after registering to avoid missing stop() signal. // Check shutdown after registering to avoid missing stop() signal.
// If stop() already iterated over clients, this client won't receive stop() // If stop() already iterated over clients, this client won't receive stop()
// and would block joinThreads() indefinitely. // and would block joinThreads() indefinitely.
@@ -149,6 +182,77 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void
client.start(); client.start();
} }
fn getClient(self: *Server) !*Client {
self.client_mutex.lock();
defer self.client_mutex.unlock();
return self.clients_pool.create();
}
fn releaseClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.clients_pool.destroy(client);
}
fn registerClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
self.clients.append(self.allocator, client) catch {};
}
fn unregisterClient(self: *Server, client: *Client) void {
self.client_mutex.lock();
defer self.client_mutex.unlock();
for (self.clients.items, 0..) |c, i| {
if (c == client) {
_ = self.clients.swapRemove(i);
break;
}
}
}
fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
if (self.shutdown.load(.acquire)) {
return error.ShuttingDown;
}
// Atomically increment active_threads only if below max_connections.
// Uses CAS loop to avoid race between checking the limit and incrementing.
//
// cmpxchgWeak may fail for two reasons:
// 1. Another thread changed the value (increment or decrement)
// 2. Spurious failure on some architectures (e.g. ARM)
//
// We use Weak instead of Strong because we need a retry loop anyway:
// if CAS fails because a thread finished (counter decreased), we should
// retry rather than return an error - there may now be room for a new connection.
//
// On failure, cmpxchgWeak returns the actual value, which we reuse to avoid
// an extra load on the next iteration.
const max_connections = self.app.config.maxConnections();
var current = self.active_threads.load(.monotonic);
while (current < max_connections) {
current = self.active_threads.cmpxchgWeak(current, current + 1, .monotonic, .monotonic) orelse break;
} else {
return error.MaxThreadsReached;
}
errdefer _ = self.active_threads.fetchSub(1, .monotonic);
const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket, timeout_ms });
thread.detach();
}
fn runWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
handleConnection(self, socket, timeout_ms);
}
fn joinThreads(self: *Server) void {
while (self.active_threads.load(.monotonic) > 0) {
std.Thread.sleep(10 * std.time.ns_per_ms);
}
}
// Handle exactly one TCP connection. // Handle exactly one TCP connection.
pub const Client = struct { pub const Client = struct {
// The client is initially serving HTTP requests but, under normal circumstances // The client is initially serving HTTP requests but, under normal circumstances