From 98124e97aa37b8e89a5b59e61985fa629593e261 Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Wed, 4 Mar 2026 19:59:22 +0000 Subject: [PATCH] Replace epoll to poll --- src/Net.zig | 133 +++++++++++++++++++++---------------------------- src/Server.zig | 5 +- 2 files changed, 58 insertions(+), 80 deletions(-) diff --git a/src/Net.zig b/src/Net.zig index 58070233..dce96644 100644 --- a/src/Net.zig +++ b/src/Net.zig @@ -19,7 +19,6 @@ const std = @import("std"); const builtin = @import("builtin"); const posix = std.posix; -const linux = std.os.linux; const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; const libcurl = @import("sys/libcurl.zig"); @@ -1552,42 +1551,17 @@ pub const RuntimeEvent = struct { events: u32, }; -pub const Runtime = if (builtin.target.os.tag == .linux) LinuxRuntime else struct { - pub const READABLE: u32 = 0; - pub const WRITABLE: u32 = 0; - pub const ERROR: u32 = 0; - pub const EventCallback = *const fn (ctx: *anyopaque, event: RuntimeEvent) anyerror!void; - pub fn init(_: Allocator) !@This() { - return error.UnsupportedPlatform; - } - pub fn deinit(_: *@This()) void {} - pub fn add(_: *@This(), _: posix.fd_t, _: u32, _: *anyopaque, _: EventCallback) !void { - return error.UnsupportedPlatform; - } - pub fn remove(_: *@This(), _: posix.fd_t) void {} - pub fn removeByCtx(_: *@This(), _: *anyopaque) void {} - pub fn dispatch(_: *@This(), _: i32) !usize { - return error.UnsupportedPlatform; - } - pub fn dispatchFor(_: *@This(), _: i32, _: ?posix.fd_t) !bool { - return error.UnsupportedPlatform; - } - pub fn run(_: *@This(), _: *anyopaque, _: *const fn (ctx: *anyopaque) bool) !void { - return error.UnsupportedPlatform; - } -}; +pub const Runtime = PollRuntime; var runtime_active = std.atomic.Value(bool).init(false); -const LinuxRuntime = struct { - pub const READABLE: u32 = linux.EPOLL.IN; - pub const WRITABLE: u32 = linux.EPOLL.OUT; - pub const ERROR: u32 = linux.EPOLL.ERR | linux.EPOLL.HUP | linux.EPOLL.RDHUP; +const PollRuntime = struct { + pub const READABLE: u32 = @intCast(posix.POLL.IN); + pub const WRITABLE: u32 = @intCast(posix.POLL.OUT); + pub const ERROR: u32 = @intCast(posix.POLL.ERR | posix.POLL.HUP | posix.POLL.NVAL); allocator: Allocator, - epoll_fd: posix.fd_t, watchers: std.AutoHashMapUnmanaged(posix.fd_t, Watcher) = .empty, - events: [64]posix.system.epoll_event = undefined, const Self = @This(); @@ -1605,42 +1579,23 @@ const LinuxRuntime = struct { } errdefer _ = runtime_active.swap(false, .acq_rel); - const epoll_fd = try posix.epoll_create1(linux.EPOLL.CLOEXEC); - errdefer posix.close(epoll_fd); - return .{ .allocator = allocator, - .epoll_fd = epoll_fd, }; } pub fn deinit(self: *Self) void { self.watchers.deinit(self.allocator); - posix.close(self.epoll_fd); _ = runtime_active.swap(false, .acq_rel); } pub fn add(self: *Self, fd: posix.fd_t, events: u32, ctx: *anyopaque, cb: EventCallback) !void { - var ev = posix.system.epoll_event{ - .events = events, - .data = .{ .fd = fd }, - }; - const gop = try self.watchers.getOrPut(self.allocator, fd); - if (gop.found_existing) { - gop.value_ptr.* = .{ .events = events, .ctx = ctx, .cb = cb }; - try posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_MOD, fd, &ev); - return; - } - - errdefer _ = self.watchers.remove(fd); gop.value_ptr.* = .{ .events = events, .ctx = ctx, .cb = cb }; - try posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_ADD, fd, &ev); } pub fn remove(self: *Self, fd: posix.fd_t) void { _ = self.watchers.remove(fd); - posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_DEL, fd, null) catch {}; } pub fn removeByCtx(self: *Self, ctx: *anyopaque) void { @@ -1665,46 +1620,70 @@ const LinuxRuntime = struct { } pub fn dispatch(self: *Self, timeout_ms: i32) !usize { - const n = posix.epoll_wait(self.epoll_fd, self.events[0..], timeout_ms); - var i: usize = 0; - while (i < n) : (i += 1) { - const ev = self.events[i]; - const fd = ev.data.fd; - - const watcher = self.watchers.get(fd) orelse continue; - watcher.cb(watcher.ctx, .{ - .fd = fd, - .events = ev.events, - }) catch |err| { - log.err(.app, "runtime callback", .{ .err = err, .fd = fd }); - self.remove(fd); - }; - } - return n; + var triggered: usize = 0; + _ = try self.dispatchPoll(timeout_ms, null, &triggered); + return triggered; } pub fn dispatchFor(self: *Self, timeout_ms: i32, watched_fd: ?posix.fd_t) !bool { - const n = posix.epoll_wait(self.epoll_fd, self.events[0..], timeout_ms); + var triggered: usize = 0; + return self.dispatchPoll(timeout_ms, watched_fd, &triggered); + } + + fn dispatchPoll(self: *Self, timeout_ms: i32, watched_fd: ?posix.fd_t, triggered: *usize) !bool { + var pollfds: std.ArrayList(posix.pollfd) = .empty; + defer pollfds.deinit(self.allocator); + + var it = self.watchers.iterator(); + while (it.next()) |entry| { + try pollfds.append(self.allocator, .{ + .fd = entry.key_ptr.*, + .events = @intCast(entry.value_ptr.events), + .revents = 0, + }); + } + + if (watched_fd) |wfd| { + if (!self.watchers.contains(wfd)) { + try pollfds.append(self.allocator, .{ + .fd = wfd, + .events = @intCast(Self.READABLE | Self.ERROR), + .revents = 0, + }); + } + } + + if (pollfds.items.len == 0) { + if (timeout_ms > 0) { + std.Thread.sleep(@as(u64, @intCast(timeout_ms)) * std.time.ns_per_ms); + } + return false; + } + + _ = try posix.poll(pollfds.items, timeout_ms); + var watched_ready = false; - var i: usize = 0; - while (i < n) : (i += 1) { - const ev = self.events[i]; - const fd = ev.data.fd; + for (pollfds.items) |pfd| { + if (pfd.revents == 0) continue; + triggered.* += 1; + + const revents_u32: u32 = @intCast(pfd.revents); if (watched_fd) |wfd| { - if (fd == wfd) { + if (pfd.fd == wfd) { watched_ready = true; } } - const watcher = self.watchers.get(fd) orelse continue; + const watcher = self.watchers.get(pfd.fd) orelse continue; watcher.cb(watcher.ctx, .{ - .fd = fd, - .events = ev.events, + .fd = pfd.fd, + .events = revents_u32, }) catch |err| { - log.err(.app, "runtime callback", .{ .err = err, .fd = fd }); - self.remove(fd); + log.err(.app, "runtime callback", .{ .err = err, .fd = pfd.fd }); + self.remove(pfd.fd); }; } + return watched_ready; } }; diff --git a/src/Server.zig b/src/Server.zig index ada4a576..d0b7593f 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -133,9 +133,8 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { try runtime.add(listener, Net.Runtime.READABLE, &accept_ctx, onListenerEvent); defer runtime.remove(listener); - runtime.run(self, shouldStopRuntime) catch |err| switch (err) { - error.UnsupportedPlatform => return err, - else => return err, + runtime.run(self, shouldStopRuntime) catch |err| { + return err; }; log.info(.app, "server stopped", .{});