From 687f57756244bfa44965e2eca8e112d72676a317 Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Thu, 5 Mar 2026 00:16:35 +0000 Subject: [PATCH] Move accept loop to common runtime --- src/App.zig | 9 ++-- src/Config.zig | 8 +++ src/Server.zig | 107 ++++++++++----------------------------- src/lightpanda.zig | 3 +- src/main.zig | 14 ++---- src/network/Runtime.zig | 109 +++++++++++++++++++++++++++++++++++++++- src/testing.zig | 16 +++--- 7 files changed, 160 insertions(+), 106 deletions(-) diff --git a/src/App.zig b/src/App.zig index 7faee4c8..9039cec5 100644 --- a/src/App.zig +++ b/src/App.zig @@ -39,7 +39,6 @@ telemetry: Telemetry, allocator: Allocator, arena_pool: ArenaPool, app_dir_path: ?[]const u8, -shutdown: bool = false, pub fn init(allocator: Allocator, config: *const Config) !*App { const app = try allocator.create(App); @@ -76,11 +75,11 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { return app; } -pub fn deinit(self: *App) void { - if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { - return; - } +pub fn shutdown(self: *const App) bool { + return self.network.shutdown.load(.acquire); +} +pub fn deinit(self: *App) void { const allocator = self.allocator; if (self.app_dir_path) |app_dir_path| { allocator.free(app_dir_path); diff --git a/src/Config.zig b/src/Config.zig index 5a4cc58e..a06fcc51 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -31,6 +31,7 @@ pub const RunMode = enum { mcp, }; +pub const MAX_LISTENERS = 16; pub const CDP_MAX_HTTP_REQUEST_SIZE = 4096; // max message size @@ -153,6 +154,13 @@ pub fn userAgentSuffix(self: *const Config) ?[]const u8 { }; } +pub fn cdpTimeout(self: *const Config) usize { + return switch (self.mode) { + .serve => |opts| if (opts.timeout > 604_800) 604_800_000 else @as(usize, opts.timeout) * 1000, + else => unreachable, + }; +} + pub fn maxConnections(self: *const Config) u16 { return switch (self.mode) { .serve => |opts| opts.cdp_max_connections, diff --git a/src/Server.zig b/src/Server.zig index 026dbda9..23ddefb5 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -18,8 +18,6 @@ const std = @import("std"); const lp = @import("lightpanda"); -const builtin = @import("builtin"); - const net = std.net; const posix = std.posix; @@ -36,9 +34,7 @@ const HttpClient = @import("browser/HttpClient.zig"); const Server = @This(); app: *App, -shutdown: std.atomic.Value(bool) = .init(false), allocator: Allocator, -listener: ?posix.socket_t, json_version_response: []const u8, // Thread management @@ -47,103 +43,52 @@ clients: std.ArrayList(*Client) = .{}, client_mutex: std.Thread.Mutex = .{}, clients_pool: std.heap.MemoryPool(Client), -pub fn init(app: *App, address: net.Address) !Server { +pub fn init(app: *App, address: net.Address) !*Server { const allocator = app.allocator; const json_version_response = try buildJSONVersionResponse(allocator, address); errdefer allocator.free(json_version_response); - return .{ + const self = try allocator.create(Server); + errdefer allocator.destroy(self); + + self.* = .{ .app = app, - .listener = null, .allocator = allocator, .json_version_response = json_version_response, - .clients_pool = std.heap.MemoryPool(Client).init(app.allocator), + .clients_pool = std.heap.MemoryPool(Client).init(allocator), }; + + try self.app.network.bind(address, self, onAccept); + log.info(.app, "server running", .{ .address = address }); + + return self; } -/// Interrupts the server so that main can complete normally and call all defer handlers. -pub fn stop(self: *Server) void { - if (self.shutdown.swap(true, .release)) { - return; - } - - // Shutdown all active clients +pub fn deinit(self: *Server) void { + // Stop all active clients { self.client_mutex.lock(); defer self.client_mutex.unlock(); + for (self.clients.items) |client| { client.stop(); } } - // Linux and BSD/macOS handle canceling a socket blocked on accept differently. - // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). - // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). - if (self.listener) |listener| switch (builtin.target.os.tag) { - .linux => posix.shutdown(listener, .recv) catch |err| { - log.warn(.app, "listener shutdown", .{ .err = err }); - }, - .macos, .freebsd, .netbsd, .openbsd => { - self.listener = null; - posix.close(listener); - }, - else => unreachable, - }; -} - -pub fn deinit(self: *Server) void { - if (!self.shutdown.load(.acquire)) { - self.stop(); - } - self.joinThreads(); - if (self.listener) |listener| { - posix.close(listener); - self.listener = null; - } self.clients.deinit(self.allocator); self.clients_pool.deinit(); self.allocator.free(self.json_version_response); + self.allocator.destroy(self); } -pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { - const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; - const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); - self.listener = listener; - - try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); - if (@hasDecl(posix.TCP, "NODELAY")) { - try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); - } - - try posix.bind(listener, &address.any, address.getOsSockLen()); - try posix.listen(listener, self.app.config.maxPendingConnections()); - - log.info(.app, "server running", .{ .address = address }); - while (!self.shutdown.load(.acquire)) { - const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening, error.ConnectionAborted => { - log.info(.app, "server stopped", .{}); - break; - }, - error.WouldBlock => { - std.Thread.sleep(10 * std.time.ns_per_ms); - continue; - }, - else => { - log.err(.app, "CDP accept", .{ .err = err }); - std.Thread.sleep(std.time.ns_per_s); - continue; - }, - } - }; - - self.spawnWorker(socket, timeout_ms) catch |err| { - log.err(.app, "CDP spawn", .{ .err = err }); - posix.close(socket); - }; - } +fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void { + const self: *Server = @ptrCast(@alignCast(ctx)); + const timeout_ms: u32 = @intCast(self.app.config.cdpTimeout()); + self.spawnWorker(socket, timeout_ms) catch |err| { + log.err(.app, "CDP spawn", .{ .err = err }); + posix.close(socket); + }; } fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void { @@ -172,10 +117,10 @@ fn handleConnection(self: *Server, socket: posix.socket_t, timeout_ms: u32) void self.registerClient(client); defer self.unregisterClient(client); - // Check shutdown after registering to avoid missing stop() signal. - // If stop() already iterated over clients, this client won't receive stop() + // Check shutdown after registering to avoid missing the stop signal. + // If deinit() already iterated over clients, this client won't receive stop() // and would block joinThreads() indefinitely. - if (self.shutdown.load(.acquire)) { + if (self.app.shutdown()) { return; } @@ -212,7 +157,7 @@ fn unregisterClient(self: *Server, client: *Client) void { } fn spawnWorker(self: *Server, socket: posix.socket_t, timeout_ms: u32) !void { - if (self.shutdown.load(.acquire)) { + if (self.app.shutdown()) { return error.ShuttingDown; } diff --git a/src/lightpanda.zig b/src/lightpanda.zig index e10d9e20..47b8181f 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -18,6 +18,7 @@ const std = @import("std"); pub const App = @import("App.zig"); +pub const Network = @import("network/Runtime.zig"); pub const Server = @import("Server.zig"); pub const Config = @import("Config.zig"); pub const URL = @import("browser/URL.zig"); @@ -34,7 +35,7 @@ pub const mcp = @import("mcp.zig"); pub const build_config = @import("build_config"); pub const crash_handler = @import("crash_handler.zig"); -const HttpClient = @import("browser/HttpClient.zig"); +pub const HttpClient = @import("browser/HttpClient.zig"); const IS_DEBUG = @import("builtin").mode == .Debug; pub const FetchOpts = struct { diff --git a/src/main.zig b/src/main.zig index dd6a759a..26e29b22 100644 --- a/src/main.zig +++ b/src/main.zig @@ -93,18 +93,14 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { return args.printUsageAndExit(false); }; - // _server is global to handle graceful shutdown. - var server = try lp.Server.init(app, address); - defer server.deinit(); - - try sighandler.on(lp.Server.stop, .{&server}); - - // max timeout of 1 week. - const timeout = if (opts.timeout > 604_800) 604_800_000 else @as(u32, opts.timeout) * 1000; - server.run(address, timeout) catch |err| { + var server = lp.Server.init(app, address) catch |err| { log.fatal(.app, "server run error", .{ .err = err }); return err; }; + defer server.deinit(); + + try sighandler.on(lp.Network.stop, .{&app.network}); + app.network.run(); }, .fetch => |opts| { const url = opts.url; diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 8de03760..4dc7647d 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -17,8 +17,10 @@ // along with this program. If not, see . const std = @import("std"); +const builtin = @import("builtin"); +const net = std.net; +const posix = std.posix; const Allocator = std.mem.Allocator; -const ArenaAllocator = std.heap.ArenaAllocator; const lp = @import("lightpanda"); const Config = @import("../Config.zig"); @@ -29,12 +31,24 @@ const RobotStore = @import("Robots.zig").RobotStore; const Runtime = @This(); +const Listener = struct { + socket: posix.socket_t, + ctx: *anyopaque, + onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, +}; + allocator: Allocator, config: *const Config, ca_blob: ?net_http.Blob, robot_store: RobotStore, +pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }), +listeners: [Config.MAX_LISTENERS]?Listener = @splat(null), + +shutdown: std.atomic.Value(bool) = .init(false), +listener_count: std.atomic.Value(usize) = .init(0), + fn globalInit() void { libcurl.curl_global_init(.{ .ssl = true }) catch |err| { lp.assert(false, "curl global init", .{ .err = err }); @@ -74,6 +88,99 @@ pub fn deinit(self: *Runtime) void { global_deinit_once.call(); } +pub fn bind( + self: *Runtime, + address: net.Address, + ctx: *anyopaque, + onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, +) !void { + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; + const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); + errdefer posix.close(listener); + + try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + if (@hasDecl(posix.TCP, "NODELAY")) { + try posix.setsockopt(listener, posix.IPPROTO.TCP, posix.TCP.NODELAY, &std.mem.toBytes(@as(c_int, 1))); + } + + try posix.bind(listener, &address.any, address.getOsSockLen()); + try posix.listen(listener, self.config.maxPendingConnections()); + + for (&self.listeners, &self.pollfds) |*slot, *pfd| { + if (slot.* == null) { + slot.* = .{ + .socket = listener, + .ctx = ctx, + .onAccept = onAccept, + }; + pfd.* = .{ + .fd = listener, + .events = posix.POLL.IN, + .revents = 0, + }; + _ = self.listener_count.fetchAdd(1, .release); + return; + } + } + + return error.TooManyListeners; +} + +pub fn run(self: *Runtime) void { + while (!self.shutdown.load(.acquire) and self.listener_count.load(.acquire) > 0) { + _ = posix.poll(&self.pollfds, -1) catch |err| { + lp.log.err(.app, "poll", .{ .err = err }); + continue; + }; + + for (&self.listeners, &self.pollfds) |*slot, *pfd| { + if (pfd.revents == 0) continue; + pfd.revents = 0; + const listener = slot.* orelse continue; + + const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { + switch (err) { + error.SocketNotListening, error.ConnectionAborted => { + pfd.* = .{ .fd = -1, .events = 0, .revents = 0 }; + slot.* = null; + _ = self.listener_count.fetchSub(1, .release); + }, + error.WouldBlock => {}, + else => { + lp.log.err(.app, "accept", .{ .err = err }); + }, + } + continue; + }; + + listener.onAccept(listener.ctx, socket); + } + } +} + +pub fn stop(self: *Runtime) void { + self.shutdown.store(true, .release); + + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. + // For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). + // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF). + for (&self.listeners, &self.pollfds) |*slot, *pfd| { + if (slot.*) |listener| { + switch (builtin.target.os.tag) { + .linux => posix.shutdown(listener.socket, .recv) catch |err| { + lp.log.warn(.app, "listener shutdown", .{ .err = err }); + }, + .macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket), + else => unreachable, + } + + pfd.* = .{ .fd = -1, .events = 0, .revents = 0 }; + slot.* = null; + _ = self.listener_count.fetchSub(1, .release); + } + } +} + pub fn newConnection(self: *Runtime) !net_http.Connection { return net_http.Connection.init(self.ca_blob, self.config); } diff --git a/src/testing.zig b/src/testing.zig index 6752ec2a..a398f824 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -460,7 +460,7 @@ const log = @import("log.zig"); const TestHTTPServer = @import("TestHTTPServer.zig"); const Server = @import("Server.zig"); -var test_cdp_server: ?Server = null; +var test_cdp_server: ?*Server = null; var test_cdp_server_thread: ?std.Thread = null; var test_http_server: ?TestHTTPServer = null; var test_http_server_thread: ?std.Thread = null; @@ -509,13 +509,11 @@ test "tests:beforeAll" { } test "tests:afterAll" { - if (test_cdp_server) |*server| { - server.stop(); - } + test_app.network.stop(); if (test_cdp_server_thread) |thread| { thread.join(); } - if (test_cdp_server) |*server| { + if (test_cdp_server) |server| { server.deinit(); } @@ -540,14 +538,14 @@ test "tests:afterAll" { fn serveCDP(wg: *std.Thread.WaitGroup) !void { const address = try std.net.Address.parseIp("127.0.0.1", 9583); - test_cdp_server = try Server.init(test_app, address); - wg.finish(); - - test_cdp_server.?.run(address, 5) catch |err| { + test_cdp_server = Server.init(test_app, address) catch |err| { std.debug.print("CDP server error: {}", .{err}); return err; }; + wg.finish(); + + test_app.network.run(); } fn testHTTPHandler(req: *std.http.Server.Request) !void {