diff --git a/src/App.zig b/src/App.zig index f3bb0b07..64e51482 100644 --- a/src/App.zig +++ b/src/App.zig @@ -40,6 +40,7 @@ telemetry: Telemetry, allocator: Allocator, app_dir_path: ?[]const u8, notification: *Notification, +shutdown: bool = false, pub const RunMode = enum { help, @@ -99,9 +100,14 @@ pub fn init(allocator: Allocator, config: Config) !*App { } pub fn deinit(self: *App) void { + if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { + return; + } + const allocator = self.allocator; if (self.app_dir_path) |app_dir_path| { allocator.free(app_dir_path); + self.app_dir_path = null; } self.telemetry.deinit(); self.notification.deinit(); diff --git a/src/Server.zig b/src/Server.zig index 2d0ae830..23edacab 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -38,7 +38,7 @@ const MAX_MESSAGE_SIZE = 512 * 1024 + 14 + 140; const Server = @This(); app: *App, -shutdown: bool, +shutdown: bool = false, allocator: Allocator, client: ?posix.socket_t, listener: ?posix.socket_t, @@ -53,16 +53,36 @@ pub fn init(app: *App, address: net.Address) !Server { .app = app, .client = null, .listener = null, - .shutdown = false, .allocator = allocator, .json_version_response = json_version_response, }; } +/// Interrupts the server so that main can complete normally and call all defer handlers. +pub fn stop(self: *Server) void { + if (@atomicRmw(bool, &self.shutdown, .Xchg, true, .monotonic)) { + return; + } + + // Linux and BSD/macOS handle canceling a socket blocked on accept differently. + // For Linux, we use std.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). + // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (BADF). + if (self.listener) |listener| switch (builtin.target.os.tag) { + .linux => posix.shutdown(listener, .recv) catch |err| { + log.warn(.app, "listener shutdown", .{ .err = err }); + }, + .macos, .freebsd, .netbsd, .openbsd => { + self.listener = null; + posix.close(listener); + }, + else => unreachable, + }; +} + pub fn deinit(self: *Server) void { - self.shutdown = true; if (self.listener) |listener| { posix.close(listener); + self.listener = null; } // *if* server.run is running, we should really wait for it to return // before existing from here. @@ -83,14 +103,19 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { try posix.listen(listener, 1); log.info(.app, "server running", .{ .address = address }); - while (true) { + while (!@atomicLoad(bool, &self.shutdown, .monotonic)) { const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { - if (self.shutdown) { - return; + switch (err) { + error.SocketNotListening, error.ConnectionAborted => { + log.info(.app, "server stopped", .{}); + break; + }, + else => { + log.err(.app, "CDP accept", .{ .err = err }); + std.Thread.sleep(std.time.ns_per_s); + continue; + }, } - log.err(.app, "CDP accept", .{ .err = err }); - std.Thread.sleep(std.time.ns_per_s); - continue; }; self.client = socket; diff --git a/src/Sighandler.zig b/src/Sighandler.zig new file mode 100644 index 00000000..2b2d7f29 --- /dev/null +++ b/src/Sighandler.zig @@ -0,0 +1,107 @@ +// Copyright (C) 2023-2025 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +//! This structure processes operating system signals (SIGINT, SIGTERM) +//! and runs callbacks to clean up the system gracefully. +//! +//! The structure does not clear the memory allocated in the arena, +//! clear the entire arena when exiting the program. +const std = @import("std"); +const assert = std.debug.assert; +const Allocator = std.mem.Allocator; +const lp = @import("lightpanda"); + +const log = lp.log; + +const SigHandler = @This(); + +arena: Allocator, + +sigset: std.posix.sigset_t = undefined, +handle_thread: ?std.Thread = null, + +attempt: u32 = 0, +listeners: std.ArrayList(Listener) = .empty, + +pub const Listener = struct { + args: []const u8, + start: *const fn (context: *const anyopaque) void, +}; + +pub fn install(self: *SigHandler) !void { + // Block SIGINT and SIGTERM for the current thread and all created from it + self.sigset = std.posix.sigemptyset(); + std.posix.sigaddset(&self.sigset, std.posix.SIG.INT); + std.posix.sigaddset(&self.sigset, std.posix.SIG.TERM); + std.posix.sigaddset(&self.sigset, std.posix.SIG.QUIT); + std.posix.sigprocmask(std.posix.SIG.BLOCK, &self.sigset, null); + + self.handle_thread = try std.Thread.spawn(.{ .allocator = self.arena }, SigHandler.sighandle, .{self}); + self.handle_thread.?.detach(); +} + +pub fn on(self: *SigHandler, func: anytype, args: std.meta.ArgsTuple(@TypeOf(func))) !void { + assert(@typeInfo(@TypeOf(func)).@"fn".return_type.? == void); + + const Args = @TypeOf(args); + const TypeErased = struct { + fn start(context: *const anyopaque) void { + const args_casted: *const Args = @ptrCast(@alignCast(context)); + @call(.auto, func, args_casted.*); + } + }; + + const buffer = try self.arena.alignedAlloc(u8, .of(Args), @sizeOf(Args)); + errdefer self.arena.free(buffer); + + const bytes: []const u8 = @ptrCast((&args)[0..1]); + @memcpy(buffer, bytes); + + try self.listeners.append(self.arena, .{ + .args = buffer, + .start = TypeErased.start, + }); +} + +fn sighandle(self: *SigHandler) noreturn { + while (true) { + var sig: c_int = 0; + + const rc = std.c.sigwait(&self.sigset, &sig); + if (rc != 0) { + log.err(.app, "Unable to process signal {}", .{rc}); + std.process.exit(1); + } + + switch (sig) { + std.posix.SIG.INT, std.posix.SIG.TERM => { + if (self.attempt > 1) { + std.process.exit(1); + } + self.attempt += 1; + + log.info(.app, "Received termination signal...", .{}); + for (self.listeners.items) |*item| { + item.start(item.args.ptr); + } + continue; + }, + else => continue, + } + } +} diff --git a/src/main.zig b/src/main.zig index d0f909fb..19da7a3c 100644 --- a/src/main.zig +++ b/src/main.zig @@ -23,32 +23,34 @@ const Allocator = std.mem.Allocator; const log = lp.log; const App = lp.App; - -var _app: ?*App = null; -var _server: ?lp.Server = null; +const SigHandler = @import("Sighandler.zig"); pub fn main() !void { // allocator // - in Debug mode we use the General Purpose Allocator to detect memory leaks // - in Release mode we use the c allocator - var gpa: std.heap.DebugAllocator(.{}) = .init; - const allocator = if (builtin.mode == .Debug) gpa.allocator() else std.heap.c_allocator; + var gpa_instance: std.heap.DebugAllocator(.{}) = .init; + const gpa = if (builtin.mode == .Debug) gpa_instance.allocator() else std.heap.c_allocator; defer if (builtin.mode == .Debug) { - if (gpa.detectLeaks()) std.posix.exit(1); + if (gpa_instance.detectLeaks()) std.posix.exit(1); }; // arena for main-specific allocations - var main_arena = std.heap.ArenaAllocator.init(allocator); - defer main_arena.deinit(); + var main_arena_instance = std.heap.ArenaAllocator.init(gpa); + const main_arena = main_arena_instance.allocator(); + defer main_arena_instance.deinit(); - run(allocator, main_arena.allocator()) catch |err| { + var sighandler = SigHandler{ .arena = main_arena }; + try sighandler.install(); + + run(gpa, main_arena, &sighandler) catch |err| { log.fatal(.app, "exit", .{ .err = err }); std.posix.exit(1); }; } -fn run(allocator: Allocator, main_arena: Allocator) !void { +fn run(allocator: Allocator, main_arena: Allocator, sighandler: *SigHandler) !void { const args = try parseArgs(main_arena); switch (args.mode) { @@ -82,7 +84,7 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { }; // _app is global to handle graceful shutdown. - _app = try App.init(allocator, .{ + var app = try App.init(allocator, .{ .run_mode = args.mode, .http_proxy = args.httpProxy(), .proxy_bearer_token = args.proxyBearerToken(), @@ -94,7 +96,6 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { .user_agent = user_agent, }); - const app = _app.?; defer app.deinit(); app.telemetry.record(.{ .run = {} }); @@ -107,10 +108,11 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { }; // _server is global to handle graceful shutdown. - _server = try lp.Server.init(app, address); - const server = &_server.?; + var server = try lp.Server.init(app, address); defer server.deinit(); + try sighandler.on(lp.Server.stop, .{&server}); + // max timeout of 1 week. const timeout = if (opts.timeout > 604_800) 604_800_000 else @as(u32, opts.timeout) * 1000; server.run(address, timeout) catch |err| { @@ -746,31 +748,3 @@ fn parseCommonArg( return false; } - -// Handle app shutdown gracefuly on signals. -fn shutdown() void { - const sigaction: std.posix.Sigaction = .{ - .handler = .{ - .handler = struct { - pub fn handler(_: c_int) callconv(.c) void { - // Shutdown service gracefuly. - if (_server) |server| { - server.deinit(); - } - if (_app) |app| { - app.deinit(); - } - std.posix.exit(0); - } - }.handler, - }, - .mask = std.posix.empty_sigset, - .flags = 0, - }; - // Exit the program on SIGINT signal. When running the browser in a Docker - // container, sending a CTRL-C (SIGINT) signal is catched but doesn't exit - // the program. Here we force exiting on SIGINT. - std.posix.sigaction(std.posix.SIG.INT, &sigaction, null); - std.posix.sigaction(std.posix.SIG.TERM, &sigaction, null); - std.posix.sigaction(std.posix.SIG.QUIT, &sigaction, null); -}