diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 4dc7647d..7dc56303 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -31,6 +31,17 @@ const RobotStore = @import("Robots.zig").RobotStore; const Runtime = @This(); +// stop() and run() synchronize access to the listener field through a syscall +// chain: posix.shutdown -> poll wakeup -> accept error. TSAN cannot infer this +// happens-before relationship, so we annotate it explicitly. +const tsan = if (builtin.sanitize_thread) struct { + extern fn __tsan_acquire(addr: *anyopaque) void; + extern fn __tsan_release(addr: *anyopaque) void; +} else struct { + inline fn __tsan_acquire(_: *anyopaque) void {} + inline fn __tsan_release(_: *anyopaque) void {} +}; + const Listener = struct { socket: posix.socket_t, ctx: *anyopaque, @@ -44,10 +55,9 @@ ca_blob: ?net_http.Blob, robot_store: RobotStore, pollfds: [Config.MAX_LISTENERS]posix.pollfd = @splat(.{ .fd = -1, .events = 0, .revents = 0 }), -listeners: [Config.MAX_LISTENERS]?Listener = @splat(null), +listener: ?Listener = null, shutdown: std.atomic.Value(bool) = .init(false), -listener_count: std.atomic.Value(usize) = .init(0), fn globalInit() void { libcurl.curl_global_init(.{ .ssl = true }) catch |err| { @@ -106,78 +116,71 @@ pub fn bind( try posix.bind(listener, &address.any, address.getOsSockLen()); try posix.listen(listener, self.config.maxPendingConnections()); - for (&self.listeners, &self.pollfds) |*slot, *pfd| { - if (slot.* == null) { - slot.* = .{ - .socket = listener, - .ctx = ctx, - .onAccept = onAccept, - }; - pfd.* = .{ - .fd = listener, - .events = posix.POLL.IN, - .revents = 0, - }; - _ = self.listener_count.fetchAdd(1, .release); - return; - } - } + if (self.listener != null) return error.TooManyListeners; - return error.TooManyListeners; + self.listener = .{ + .socket = listener, + .ctx = ctx, + .onAccept = onAccept, + }; + self.pollfds[0] = .{ + .fd = listener, + .events = posix.POLL.IN, + .revents = 0, + }; } pub fn run(self: *Runtime) void { - while (!self.shutdown.load(.acquire) and self.listener_count.load(.acquire) > 0) { + while (!self.shutdown.load(.acquire)) { + const listener = self.listener orelse return; + _ = posix.poll(&self.pollfds, -1) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); continue; }; - for (&self.listeners, &self.pollfds) |*slot, *pfd| { - if (pfd.revents == 0) continue; - pfd.revents = 0; - const listener = slot.* orelse continue; + tsan.__tsan_acquire(@ptrCast(&self.listener)); - const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening, error.ConnectionAborted => { - pfd.* = .{ .fd = -1, .events = 0, .revents = 0 }; - slot.* = null; - _ = self.listener_count.fetchSub(1, .release); - }, - error.WouldBlock => {}, - else => { - lp.log.err(.app, "accept", .{ .err = err }); - }, - } - continue; - }; + if (self.pollfds[0].revents == 0) continue; + self.pollfds[0].revents = 0; - listener.onAccept(listener.ctx, socket); - } + const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { + switch (err) { + error.SocketNotListening, error.ConnectionAborted => { + self.pollfds[0] = .{ .fd = -1, .events = 0, .revents = 0 }; + self.listener = null; + }, + error.WouldBlock => {}, + else => { + lp.log.err(.app, "accept", .{ .err = err }); + }, + } + continue; + }; + + listener.onAccept(listener.ctx, socket); } } pub fn stop(self: *Runtime) void { self.shutdown.store(true, .release); + // Wake up poll() by shutting down/closing the listener socket. + // Do NOT modify listener/pollfds here — run() handles cleanup in its + // accept error paths, avoiding data races with poll() and the run loop. + // // Linux and BSD/macOS handle canceling a socket blocked on accept differently. // For Linux, we use posix.shutdown, which will cause accept to return error.SocketNotListening (EINVAL). // For BSD, shutdown will return an error. Instead we call posix.close, which will result with error.ConnectionAborted (EBADF). - for (&self.listeners, &self.pollfds) |*slot, *pfd| { - if (slot.*) |listener| { - switch (builtin.target.os.tag) { - .linux => posix.shutdown(listener.socket, .recv) catch |err| { - lp.log.warn(.app, "listener shutdown", .{ .err = err }); - }, - .macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket), - else => unreachable, - } - - pfd.* = .{ .fd = -1, .events = 0, .revents = 0 }; - slot.* = null; - _ = self.listener_count.fetchSub(1, .release); + if (self.listener) |listener| { + switch (builtin.target.os.tag) { + .linux => posix.shutdown(listener.socket, .recv) catch |err| { + lp.log.warn(.app, "listener shutdown", .{ .err = err }); + }, + .macos, .freebsd, .netbsd, .openbsd => posix.close(listener.socket), + else => unreachable, } + tsan.__tsan_release(@ptrCast(&self.listener)); } } diff --git a/src/network/http.zig b/src/network/http.zig index fe2f1034..28fd7736 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -369,7 +369,7 @@ pub const Connection = struct { pub fn setProxyCredentials(self: *const Connection, creds: [:0]const u8) !void { try libcurl.curl_easy_setopt(self.easy, .proxy_user_pwd, creds.ptr); } - + pub fn setCredentials(self: *const Connection, creds: [:0]const u8) !void { try libcurl.curl_easy_setopt(self.easy, .user_pwd, creds.ptr); }