Move accept loop to common runtime

This commit is contained in:
Nikolay Govorov
2026-03-05 00:16:35 +00:00
parent 8e59ce9e9f
commit 687f577562
7 changed files with 160 additions and 106 deletions

View File

@@ -39,7 +39,6 @@ telemetry: Telemetry,
allocator: Allocator, allocator: Allocator,
arena_pool: ArenaPool, arena_pool: ArenaPool,
app_dir_path: ?[]const u8, app_dir_path: ?[]const u8,
shutdown: bool = false,
pub fn init(allocator: Allocator, config: *const Config) !*App { pub fn init(allocator: Allocator, config: *const Config) !*App {
const app = try allocator.create(App); const app = try allocator.create(App);
@@ -76,11 +75,11 @@ pub fn init(allocator: Allocator, config: *const Config) !*App {
return app; return app;
} }
pub fn deinit(self: *App) void { pub fn shutdown(self: *const App) bool {
if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { return self.network.shutdown.load(.acquire);
return; }
}
pub fn deinit(self: *App) void {
const allocator = self.allocator; const allocator = self.allocator;
if (self.app_dir_path) |app_dir_path| { if (self.app_dir_path) |app_dir_path| {
allocator.free(app_dir_path); allocator.free(app_dir_path);

View File

@@ -31,6 +31,7 @@ pub const RunMode = enum {
mcp, mcp,
}; };
pub const MAX_LISTENERS = 16;
pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096; pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096;
// max message size // max message size
@@ -153,6 +154,13 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 {
}; };
} }
pub fn cdpTimeout(self: *const Config) usize {
return switch (self.mode) {
.serve => |opts| if (opts.timeout > 604_800) 604_800_000 else @as(usize, opts.timeout) * 1000,
else => unreachable,
};
}
pub fn maxConnections(self: *const Config) u16 { pub fn maxConnections(self: *const Config) u16 {
return switch (self.mode) { return switch (self.mode) {
.serve => |opts| opts.cdp_max_connections, .serve => |opts| opts.cdp_max_connections,

View File

@@ -18,8 +18,6 @@
const std = @import("std"); const std = @import("std");
const lp = @import("lightpanda"); const lp = @import("lightpanda");
const builtin = @import("builtin");
const net = std.net; const net = std.net;
const posix = std.posix; const posix = std.posix;
@@ -36,9 +34,7 @@ const HttpClient = @import("browser/HttpClient.zig");
const Server = @This(); const Server = @This();
app: *App, app: *App,
shutdown: std.atomic.Value(bool) = .init(false),
allocator: Allocator, allocator: Allocator,
listener: ?posix.socket_t,
json_version_response: []const u8, json_version_response: []const u8,
// Thread management // Thread management
@@ -47,103 +43,52 @@ clients: std.ArrayList(*Client) = .{},
client_mutex: std.Thread.Mutex = .{}, client_mutex: std.Thread.Mutex = .{},
clients_pool: std.heap.MemoryPool(Client), clients_pool: std.heap.MemoryPool(Client),
pub fn init(app: *App, address: net.Address) !Server { pub fn init(app: *App, address: net.Address) !*Server {
const allocator = app.allocator; const allocator = app.allocator;
const json_version_response = try buildJSONVersionResponse(allocator, address); const json_version_response = try buildJSONVersionResponse(allocator, address);
errdefer allocator.free(json_version_response); errdefer allocator.free(json_version_response);
return .{ const self = try allocator.create(Server);
errdefer allocator.destroy(self);
self.* = .{
.app = app, .app = app,
.listener = null,
.allocator = allocator, .allocator = allocator,
.json_version_response = json_version_response, .json_version_response = json_version_response,
.clients_pool = std.heap.MemoryPool(Client).init(app.allocator), .clients_pool = std.heap.MemoryPool(Client).init(allocator),
}; };
try self.app.network.bind(address, self, onAccept);
log.info(.app, "server running", .{ .address = address });
return self;
} }
/// Interrupts the server so that main can complete normally and call all defer handlers. pub fn deinit(self: *Server) void {
pub fn stop(self: *Server) void { // Stop all active clients
if (self.shutdown.swap(true, .release)) {
return;
}
// Shutdown all active clients
{ {
self.client_mutex.lock(); self.client_mutex.lock();
defer self.client_mutex.unlock(); defer self.client_mutex.unlock();
for (self.clients.items) |client| { for (self.clients.items) |client| {
client.stop(); client.stop();
} }
} }
// Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF).
if (self.listener) |listener| switch (builtin.target.os.tag) {
.linux => posix.shutdown(listener, .recv) catch |err| {
log.warn(.app, "listener shutdown", .{ .err = err });
},
.macos, .freebsd, .netbsd, .openbsd => {
self.listener = null;
posix.close(listener);
},
else => unreachable,
};
}
pub fn deinit(self: *Server) void {
if (!self.shutdown.load(.acquire)) {
self.stop();
}
self.joinThreads(); self.joinThreads();
if (self.listener) |listener| {
posix.close(listener);
self.listener = null;
}
self.clients.deinit(self.allocator); self.clients.deinit(self.allocator);
self.clients_pool.deinit(); self.clients_pool.deinit();
self.allocator.free(self.json_version_response); self.allocator.free(self.json_version_response);
self.allocator.destroy(self);
} }
pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const self: *Server = @ptrCast(@alignCast(ctx));
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); const timeout_ms: u32 = @intCast(self.app.config.cdpTimeout());
self.listener = listener;
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
if (@hasDecl(posix.TCP, "NODELAY")) {
try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
}
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, self.app.config.maxPendingConnections());
log.info(.app, "server running", .{ .address = address });
while (!self.shutdown.load(.acquire)) {
const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) {
error.SocketNotListening, error.ConnectionAborted => {
log.info(.app, "server stopped", .{});
break;
},
error.WouldBlock => {
std.Thread.sleep(10 * std.time.ns_per_ms);
continue;
},
else => {
log.err(.app, "CDP accept", .{ .err = err });
std.Thread.sleep(std.time.ns_per_s);
continue;
},
}
};
self.spawnWorker(socket, timeout_ms) catch |err| { self.spawnWorker(socket, timeout_ms) catch |err| {
log.err(.app, "CDP spawn", .{ .err = err }); log.err(.app, "CDP spawn", .{ .err = err });
posix.close(socket); posix.close(socket);
}; };
}
} }
fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void {
@@ -172,10 +117,10 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void
self.registerClient(client); self.registerClient(client);
defer self.unregisterClient(client); defer self.unregisterClient(client);
// Check shutdown after registering to avoid missing stop() signal. // Check shutdown after registering to avoid missing the stop signal.
// If stop() already iterated over clients, this client won't receive stop() // If deinit() already iterated over clients, this client won't receive stop()
// and would block joinThreads() indefinitely. // and would block joinThreads() indefinitely.
if (self.shutdown.load(.acquire)) { if (self.app.shutdown()) {
return; return;
} }
@@ -212,7 +157,7 @@ fn unregisterClient(self: *Server, client: *Client) void {
} }
fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void {
if (self.shutdown.load(.acquire)) { if (self.app.shutdown()) {
return error.ShuttingDown; return error.ShuttingDown;
} }

View File

@@ -18,6 +18,7 @@
const std = @import("std"); const std = @import("std");
pub const App = @import("App.zig"); pub const App = @import("App.zig");
pub const Network = @import("network/Runtime.zig");
pub const Server = @import("Server.zig"); pub const Server = @import("Server.zig");
pub const Config = @import("Config.zig"); pub const Config = @import("Config.zig");
pub const URL = @import("browser/URL.zig"); pub const URL = @import("browser/URL.zig");
@@ -34,7 +35,7 @@ pub const mcp = @import("mcp.zig");
pub const build_config = @import("build_config"); pub const build_config = @import("build_config");
pub const crash_handler = @import("crash_handler.zig"); pub const crash_handler = @import("crash_handler.zig");
const HttpClient = @import("browser/HttpClient.zig"); pub const HttpClient = @import("browser/HttpClient.zig");
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
pub const FetchOpts = struct { pub const FetchOpts = struct {

View File

@@ -93,18 +93,14 @@ fn run(allocator: Allocator, main_arena: Allocator) !void {
return args.printUsageAndExit(false); return args.printUsageAndExit(false);
}; };
// _server is global to handle graceful shutdown. var server = lp.Server.init(app, address) catch |err| {
var server = try lp.Server.init(app, address);
defer server.deinit();
try sighandler.on(lp.Server.stop, .{&server});
// max timeout of 1 week.
const timeout = if (opts.timeout > 604_800) 604_800_000 else @as(u32, opts.timeout) * 1000;
server.run(address, timeout) catch |err| {
log.fatal(.app, "server run error", .{ .err = err }); log.fatal(.app, "server run error", .{ .err = err });
return err; return err;
}; };
defer server.deinit();
try sighandler.on(lp.Network.stop, .{&app.network});
app.network.run();
}, },
.fetch => |opts| { .fetch => |opts| {
const url = opts.url; const url = opts.url;

View File

@@ -17,8 +17,10 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const lp = @import("lightpanda"); const lp = @import("lightpanda");
const Config = @import("../Config.zig"); const Config = @import("../Config.zig");
@@ -29,12 +31,24 @@ const RobotStore = @import("Robots.zig").RobotStore;
const Runtime = @This(); const Runtime = @This();
const Listener = struct {
socket: posix.socket_t,
ctx: *anyopaque,
onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void,
};
allocator: Allocator, allocator: Allocator,
config: *const Config, config: *const Config,
ca_blob: ?net_http.Blob, ca_blob: ?net_http.Blob,
robot_store: RobotStore, robot_store: RobotStore,
pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }),
listeners: [Config.MAX_LISTENERS]?Listener = @splat(null),
shutdown: std.atomic.Value(bool) = .init(false),
listener_count: std.atomic.Value(usize) = .init(0),
fn globalInit() void { fn globalInit() void {
libcurl.curl_global_init(.{ .ssl = true }) catch |err| { libcurl.curl_global_init(.{ .ssl = true }) catch |err| {
lp.assert(false, "curl global init", .{ .err = err }); lp.assert(false, "curl global init", .{ .err = err });
@@ -74,6 +88,99 @@ pub fn deinit(self: *Runtime) void {
global_deinit_once.call(); global_deinit_once.call();
} }
pub fn bind(
self: *Runtime,
address: net.Address,
ctx: *anyopaque,
onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void,
) !void {
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP);
errdefer posix.close(listener);
try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
if (@hasDecl(posix.TCP, "NODELAY")) {
try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1)));
}
try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, self.config.maxPendingConnections());
for (&self.listeners, &self.pollfds) |*slot, *pfd| {
if (slot.* == null) {
slot.* = .{
.socket = listener,
.ctx = ctx,
.onAccept = onAccept,
};
pfd.* = .{
.fd = listener,
.events = posix.POLL.IN,
.revents = 0,
};
_ = self.listener_count.fetchAdd(1, .release);
return;
}
}
return error.TooManyListeners;
}
pub fn run(self: *Runtime) void {
while (!self.shutdown.load(.acquire) and self.listener_count.load(.acquire) > 0) {
_ = posix.poll(&self.pollfds, -1) catch |err| {
lp.log.err(.app, "poll", .{ .err = err });
continue;
};
for (&self.listeners, &self.pollfds) |*slot, *pfd| {
if (pfd.revents == 0) continue;
pfd.revents = 0;
const listener = slot.* orelse continue;
const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) {
error.SocketNotListening, error.ConnectionAborted => {
pfd.* = .{ .fd = -1, .events = 0, .revents = 0 };
slot.* = null;
_ = self.listener_count.fetchSub(1, .release);
},
error.WouldBlock => {},
else => {
lp.log.err(.app, "accept", .{ .err = err });
},
}
continue;
};
listener.onAccept(listener.ctx, socket);
}
}
}
pub fn stop(self: *Runtime) void {
self.shutdown.store(true, .release);
// Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF).
for (&self.listeners, &self.pollfds) |*slot, *pfd| {
if (slot.*) |listener| {
switch (builtin.target.os.tag) {
.linux => posix.shutdown(listener.socket, .recv) catch |err| {
lp.log.warn(.app, "listener shutdown", .{ .err = err });
},
.macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket),
else => unreachable,
}
pfd.* = .{ .fd = -1, .events = 0, .revents = 0 };
slot.* = null;
_ = self.listener_count.fetchSub(1, .release);
}
}
}
pub fn newConnection(self: *Runtime) !net_http.Connection { pub fn newConnection(self: *Runtime) !net_http.Connection {
return net_http.Connection.init(self.ca_blob, self.config); return net_http.Connection.init(self.ca_blob, self.config);
} }

View File

@@ -460,7 +460,7 @@ const log = @import("log.zig");
const TestHTTPServer = @import("TestHTTPServer.zig"); const TestHTTPServer = @import("TestHTTPServer.zig");
const Server = @import("Server.zig"); const Server = @import("Server.zig");
var test_cdp_server: ?Server = null; var test_cdp_server: ?*Server = null;
var test_cdp_server_thread: ?std.Thread = null; var test_cdp_server_thread: ?std.Thread = null;
var test_http_server: ?TestHTTPServer = null; var test_http_server: ?TestHTTPServer = null;
var test_http_server_thread: ?std.Thread = null; var test_http_server_thread: ?std.Thread = null;
@@ -509,13 +509,11 @@ test "tests:beforeAll" {
} }
test "tests:afterAll" { test "tests:afterAll" {
if (test_cdp_server) |*server| { test_app.network.stop();
server.stop();
}
if (test_cdp_server_thread) |thread| { if (test_cdp_server_thread) |thread| {
thread.join(); thread.join();
} }
if (test_cdp_server) |*server| { if (test_cdp_server) |server| {
server.deinit(); server.deinit();
} }
@@ -540,14 +538,14 @@ test "tests:afterAll" {
fn serveCDP(wg: *std.Thread.WaitGroup) !void { fn serveCDP(wg: *std.Thread.WaitGroup) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 9583); const address = try std.net.Address.parseIp("127.0.0.1", 9583);
test_cdp_server = try Server.init(test_app, address);
wg.finish(); test_cdp_server = Server.init(test_app, address) catch |err| {
test_cdp_server.?.run(address, 5) catch |err| {
std.debug.print("CDP server error: {}", .{err}); std.debug.print("CDP server error: {}", .{err});
return err; return err;
}; };
wg.finish();
test_app.network.run();
} }
fn testHTTPHandler(req: *std.http.Server.Request) !void { fn testHTTPHandler(req: *std.http.Server.Request) !void {