diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 7dc56303..a3774650 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -31,17 +31,6 @@ const RobotStore = @import("Robots.zig").RobotStore; const Runtime = @This(); -// stop() and run() synchronize access to the listener field through a syscall -// chain: posix.shutdown -> poll wakeup -> accept error. TSAN cannot infer this -// happens-before relationship, so we annotate it explicitly. -const tsan = if (builtin.sanitize_thread) struct { - extern fn __tsan_acquire(addr: *anyopaque) void; - extern fn __tsan_release(addr: *anyopaque) void; -} else struct { - inline fn __tsan_acquire(_: *anyopaque) void {} - inline fn __tsan_release(_: *anyopaque) void {} -}; - const Listener = struct { socket: posix.socket_t, ctx: *anyopaque, @@ -54,9 +43,12 @@ config: *const Config, ca_blob: ?net_http.Blob, robot_store: RobotStore, -pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }), +pollfds: []posix.pollfd, listener: ?Listener = null, +// Wakeup pipe: workers write to [1], main thread polls [0] +wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, + shutdown: std.atomic.Value(bool) = .init(false), fn globalInit() void { @@ -76,6 +68,15 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { global_init_once.call(); errdefer global_deinit_once.call(); + const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); + + // 0 is wakeup, 1 is listener + const pollfds = try allocator.alloc(posix.pollfd, 2); + errdefer allocator.free(pollfds); + + @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); + pollfds[0] = .{ .fd = pipe[0], .events = posix.POLL.IN, .revents = 0 }; + var ca_blob: ?net_http.Blob = null; if (config.tlsVerifyHost()) { ca_blob = try loadCerts(allocator); @@ -86,10 +87,21 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { .config = config, .ca_blob = ca_blob, .robot_store = RobotStore.init(allocator), + .pollfds = pollfds, + .wakeup_pipe = pipe, }; } pub fn deinit(self: *Runtime) void { + for (&self.wakeup_pipe) |*fd| { + if (fd.* >= 0) { + posix.close(fd.*); + fd.* = -1; + } + } + + self.allocator.free(self.pollfds); + if (self.ca_blob) |ca_blob| { const data: [*]u8 = @ptrCast(ca_blob.data); self.allocator.free(data[0..ca_blob.len]); @@ -123,7 +135,7 @@ pub fn bind( .ctx = ctx, .onAccept = onAccept, }; - self.pollfds[0] = .{ + self.pollfds[1] = .{ .fd = listener, .events = posix.POLL.IN, .revents = 0, @@ -134,20 +146,27 @@ pub fn run(self: *Runtime) void { while (!self.shutdown.load(.acquire)) { const listener = self.listener orelse return; - _ = posix.poll(&self.pollfds, -1) catch |err| { + _ = posix.poll(self.pollfds, -1) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); continue; }; - tsan.__tsan_acquire(@ptrCast(&self.listener)); + // check wakeup socket + if (self.pollfds[0].revents != 0) { + self.pollfds[0].revents = 0; - if (self.pollfds[0].revents == 0) continue; - self.pollfds[0].revents = 0; + // If we were woken up, perhaps everything was cancelled and the iteration can be completed. + if (self.shutdown.load(.acquire)) break; + } + + // check new connections; + if (self.pollfds[1].revents == 0) continue; + self.pollfds[1].revents = 0; const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { switch (err) { error.SocketNotListening, error.ConnectionAborted => { - self.pollfds[0] = .{ .fd = -1, .events = 0, .revents = 0 }; + self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; self.listener = null; }, error.WouldBlock => {}, @@ -160,28 +179,18 @@ pub fn run(self: *Runtime) void { listener.onAccept(listener.ctx, socket); } + + if (self.listener) |listener| { + posix.shutdown(listener.socket, .both) catch |err| { + lp.log.warn(.app, "listener shutdown", .{ .err = err }); + }; + posix.close(listener.socket); + } } pub fn stop(self: *Runtime) void { self.shutdown.store(true, .release); - - // Wake up poll() by shutting down/closing the listener socket. - // Do NOT modify listener/pollfds here — run() handles cleanup in its - // accept error paths, avoiding data races with poll() and the run loop. - // - // Linux and BSD/macOS handle canceling a socket blocked on accept differently. - // For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). - // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF). - if (self.listener) |listener| { - switch (builtin.target.os.tag) { - .linux => posix.shutdown(listener.socket, .recv) catch |err| { - lp.log.warn(.app, "listener shutdown", .{ .err = err }); - }, - .macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket), - else => unreachable, - } - tsan.__tsan_release(@ptrCast(&self.listener)); - } + _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; } pub fn newConnection(self: *Runtime) !net_http.Connection {