Only one listener in network.Runtime

This commit is contained in:
Nikolay Govorov
2026-03-10 02:23:24 +00:00
parent 687f577562
commit 60350efa10
2 changed files with 57 additions and 54 deletions

View File

@@ -31,6 +31,17 @@ const RobotStore = @import("Robots.zig").RobotStore;
const Runtime = @This(); const Runtime = @This();
// stop() and run() synchronize access to the listener field through a syscall
// chain: posix.shutdown -> poll wakeup -> accept error. TSAN cannot infer this
// happens-before relationship, so we annotate it explicitly.
const tsan = if (builtin.sanitize_thread) struct {
extern fn __tsan_acquire(addr: *anyopaque) void;
extern fn __tsan_release(addr: *anyopaque) void;
} else struct {
inline fn __tsan_acquire(_: *anyopaque) void {}
inline fn __tsan_release(_: *anyopaque) void {}
};
const Listener = struct { const Listener = struct {
socket: posix.socket_t, socket: posix.socket_t,
ctx: *anyopaque, ctx: *anyopaque,
@@ -44,10 +55,9 @@ ca_blob: ?net_http.Blob,
robot_store: RobotStore, robot_store: RobotStore,
pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }), pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }),
listeners: [Config.MAX_LISTENERS]?Listener = @splat(null), listener: ?Listener = null,
shutdown: std.atomic.Value(bool) = .init(false), shutdown: std.atomic.Value(bool) = .init(false),
listener_count: std.atomic.Value(usize) = .init(0),
fn globalInit() void { fn globalInit() void {
libcurl.curl_global_init(.{ .ssl = true }) catch |err| { libcurl.curl_global_init(.{ .ssl = true }) catch |err| {
@@ -106,44 +116,39 @@ pub fn bind(
try posix.bind(listener, &address.any, address.getOsSockLen()); try posix.bind(listener, &address.any, address.getOsSockLen());
try posix.listen(listener, self.config.maxPendingConnections()); try posix.listen(listener, self.config.maxPendingConnections());
for (&self.listeners, &self.pollfds) |*slot, *pfd| { if (self.listener != null) return error.TooManyListeners;
if (slot.* == null) {
slot.* = .{ self.listener = .{
.socket = listener, .socket = listener,
.ctx = ctx, .ctx = ctx,
.onAccept = onAccept, .onAccept = onAccept,
}; };
pfd.* = .{ self.pollfds[0] = .{
.fd = listener, .fd = listener,
.events = posix.POLL.IN, .events = posix.POLL.IN,
.revents = 0, .revents = 0,
}; };
_ = self.listener_count.fetchAdd(1, .release);
return;
}
}
return error.TooManyListeners;
} }
pub fn run(self: *Runtime) void { pub fn run(self: *Runtime) void {
while (!self.shutdown.load(.acquire) and self.listener_count.load(.acquire) > 0) { while (!self.shutdown.load(.acquire)) {
const listener = self.listener orelse return;
_ = posix.poll(&self.pollfds, -1) catch |err| { _ = posix.poll(&self.pollfds, -1) catch |err| {
lp.log.err(.app, "poll", .{ .err = err }); lp.log.err(.app, "poll", .{ .err = err });
continue; continue;
}; };
for (&self.listeners, &self.pollfds) |*slot, *pfd| { tsan.__tsan_acquire(@ptrCast(&self.listener));
if (pfd.revents == 0) continue;
pfd.revents = 0; if (self.pollfds[0].revents == 0) continue;
const listener = slot.* orelse continue; self.pollfds[0].revents = 0;
const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| {
switch (err) { switch (err) {
error.SocketNotListening, error.ConnectionAborted => { error.SocketNotListening, error.ConnectionAborted => {
pfd.* = .{ .fd = -1, .events = 0, .revents = 0 }; self.pollfds[0] = .{ .fd = -1, .events = 0, .revents = 0 };
slot.* = null; self.listener = null;
_ = self.listener_count.fetchSub(1, .release);
}, },
error.WouldBlock => {}, error.WouldBlock => {},
else => { else => {
@@ -156,16 +161,18 @@ pub fn run(self: *Runtime) void {
listener.onAccept(listener.ctx, socket); listener.onAccept(listener.ctx, socket);
} }
} }
}
pub fn stop(self: *Runtime) void { pub fn stop(self: *Runtime) void {
self.shutdown.store(true, .release); self.shutdown.store(true, .release);
// Wake up poll() by shutting down/closing the listener socket.
// Do NOT modify listener/pollfds here — run() handles cleanup in its
// accept error paths, avoiding data races with poll() and the run loop.
//
// Linux and BSD/macOS handle canceling a socket blocked on accept differently. // Linux and BSD/macOS handle canceling a socket blocked on accept differently.
// For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL).
// For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF).
for (&self.listeners, &self.pollfds) |*slot, *pfd| { if (self.listener) |listener| {
if (slot.*) |listener| {
switch (builtin.target.os.tag) { switch (builtin.target.os.tag) {
.linux => posix.shutdown(listener.socket, .recv) catch |err| { .linux => posix.shutdown(listener.socket, .recv) catch |err| {
lp.log.warn(.app, "listener shutdown", .{ .err = err }); lp.log.warn(.app, "listener shutdown", .{ .err = err });
@@ -173,11 +180,7 @@ pub fn stop(self: *Runtime) void {
.macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket), .macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket),
else => unreachable, else => unreachable,
} }
tsan.__tsan_release(@ptrCast(&self.listener));
pfd.* = .{ .fd = -1, .events = 0, .revents = 0 };
slot.* = null;
_ = self.listener_count.fetchSub(1, .release);
}
} }
} }