From dd1c758c0ea81442f301843cfbe6a9bc8d138b2b Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Thu, 26 Feb 2026 07:51:47 +0000 Subject: [PATCH] Use common epoll for all net operations --- src/Net.zig | 328 ++++++++++++++++++++++++++++++++++++++++++++ src/Server.zig | 63 ++++++--- src/http/Client.zig | 8 ++ src/sys/libcurl.zig | 42 ++++++ 4 files changed, 423 insertions(+), 18 deletions(-) diff --git a/src/Net.zig b/src/Net.zig index bd723be6..58070233 100644 --- a/src/Net.zig +++ b/src/Net.zig @@ -19,6 +19,7 @@ const std = @import("std"); const builtin = @import("builtin"); const posix = std.posix; +const linux = std.os.linux; const Allocator = std.mem.Allocator; const ArenaAllocator = std.heap.ArenaAllocator; const libcurl = @import("sys/libcurl.zig"); @@ -476,9 +477,17 @@ pub const Handles = struct { available: HandleList, multi: *libcurl.CurlM, performing: bool = false, + runtime_ctx: ?RuntimeContext = null, pub const HandleList = std.DoublyLinkedList; + const RuntimeContext = struct { + handles: *Handles, + runtime: *Runtime, + cdp_fd: ?posix.fd_t, + timer_deadline_ms: ?i64 = null, + }; + pub fn init( allocator: Allocator, ca_blob: ?libcurl.CurlBlob, @@ -532,6 +541,9 @@ pub const Handles = struct { pub fn add(self: *Handles, conn: *const Connection) !void { try libcurl.curl_multi_add_handle(self.multi, conn.easy); + if (self.runtime_ctx != null) { + _ = try self.socketActionTimeout(); + } } pub fn remove(self: *Handles, conn: *Connection) void { @@ -554,6 +566,10 @@ pub const Handles = struct { } pub fn perform(self: *Handles) !c_int { + if (self.runtime_ctx != null) { + return self.socketActionTimeout(); + } + self.performing = true; defer self.performing = false; @@ -579,6 +595,34 @@ pub const Handles = struct { } pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { + if (self.runtime_ctx) |*ctx| { + var wait_ms: i32 = @intCast(timeout_ms); + try self.runDueTimer(); + + if (ctx.timer_deadline_ms) |deadline| { + const now = std.time.milliTimestamp(); + if (deadline > now) { + const remaining: i32 = @intCast(@min(deadline - now, std.math.maxInt(i32))); + wait_ms = @min(wait_ms, remaining); + } else { + wait_ms = 0; + } + } + + const watched_fd: ?posix.fd_t = if (extra_fds.len > 0) extra_fds[0].fd else ctx.cdp_fd; + const watched_ready = try ctx.runtime.dispatchFor(wait_ms, watched_fd); + + if (extra_fds.len > 0) { + extra_fds[0].revents = .{}; + if (watched_ready) { + extra_fds[0].revents.pollin = true; + } + } + + try self.runDueTimer(); + return; + } + try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); } @@ -598,6 +642,128 @@ pub const Handles = struct { else => unreachable, }; } + + pub fn attachRuntime(self: *Handles, runtime: *Runtime, cdp_fd: ?posix.fd_t) !void { + self.detachRuntime(); + self.runtime_ctx = .{ + .handles = self, + .runtime = runtime, + .cdp_fd = cdp_fd, + .timer_deadline_ms = null, + }; + + const ctx = &self.runtime_ctx.?; + try libcurl.curl_multi_setopt(self.multi, .socket_function, onCurlSocket); + try libcurl.curl_multi_setopt(self.multi, .socket_data, ctx); + try libcurl.curl_multi_setopt(self.multi, .timer_function, onCurlTimer); + try libcurl.curl_multi_setopt(self.multi, .timer_data, ctx); + _ = try self.socketActionTimeout(); + } + + pub fn detachRuntime(self: *Handles) void { + if (self.runtime_ctx) |*ctx| { + ctx.runtime.removeByCtx(ctx); + } + libcurl.curl_multi_setopt(self.multi, .socket_function, @as(?*const libcurl.CurlSocketCallback, null)) catch {}; + libcurl.curl_multi_setopt(self.multi, .socket_data, @as(?*anyopaque, null)) catch {}; + libcurl.curl_multi_setopt(self.multi, .timer_function, @as(?*const libcurl.CurlTimerCallback, null)) catch {}; + libcurl.curl_multi_setopt(self.multi, .timer_data, @as(?*anyopaque, null)) catch {}; + self.runtime_ctx = null; + } + + fn runDueTimer(self: *Handles) !void { + const ctx = &(self.runtime_ctx orelse return); + const deadline = ctx.timer_deadline_ms orelse return; + if (std.time.milliTimestamp() < deadline) { + return; + } + ctx.timer_deadline_ms = null; + _ = try self.socketActionTimeout(); + } + + fn socketAction(self: *Handles, fd: posix.fd_t, events: u32) !c_int { + const select_mask = runtimeEventsToCurlSelect(events).toC(); + var running: c_int = undefined; + self.performing = true; + defer self.performing = false; + try libcurl.curl_multi_socket_action(self.multi, @intCast(fd), select_mask, &running); + return running; + } + + fn socketActionTimeout(self: *Handles) !c_int { + var running: c_int = undefined; + self.performing = true; + defer self.performing = false; + try libcurl.curl_multi_socket_action(self.multi, libcurl.CURL_SOCKET_TIMEOUT, 0, &running); + return running; + } + + fn runtimeEventsToCurlSelect(events: u32) libcurl.CurlSelectMask { + return .{ + .in = (events & Runtime.READABLE) != 0, + .out = (events & Runtime.WRITABLE) != 0, + .err = (events & Runtime.ERROR) != 0, + }; + } + + fn curlPollToRuntimeEvents(what: libcurl.CurlPoll) u32 { + return switch (what) { + .in => Runtime.READABLE | Runtime.ERROR, + .out => Runtime.WRITABLE | Runtime.ERROR, + .inout => Runtime.READABLE | Runtime.WRITABLE | Runtime.ERROR, + .remove => 0, + }; + } + + fn onRuntimeFdEvent(ctx_ptr: *anyopaque, event: RuntimeEvent) anyerror!void { + const ctx: *RuntimeContext = @ptrCast(@alignCast(ctx_ptr)); + _ = try ctx.handles.socketAction(event.fd, event.events); + } + + fn onCurlSocket( + easy: ?*libcurl.Curl, + s: libcurl.CurlSocket, + what_raw: c_int, + userp: ?*anyopaque, + socketp: ?*anyopaque, + ) callconv(.c) c_int { + _ = easy; + _ = socketp; + const ctx = userp orelse return 0; + const runtime_ctx: *RuntimeContext = @ptrCast(@alignCast(ctx)); + + const what = std.meta.intToEnum(libcurl.CurlPoll, what_raw) catch return 0; + if (what == .remove) { + runtime_ctx.runtime.remove(@intCast(s)); + return 0; + } + + runtime_ctx.runtime.add( + @intCast(s), + curlPollToRuntimeEvents(what), + runtime_ctx, + onRuntimeFdEvent, + ) catch {}; + return 0; + } + + fn onCurlTimer( + multi: ?*libcurl.CurlM, + timeout_ms: c_long, + userp: ?*anyopaque, + ) callconv(.c) c_int { + _ = multi; + const ctx = userp orelse return 0; + const runtime_ctx: *RuntimeContext = @ptrCast(@alignCast(ctx)); + + if (timeout_ms < 0) { + runtime_ctx.timer_deadline_ms = null; + return 0; + } + + runtime_ctx.timer_deadline_ms = std.time.milliTimestamp() + @as(i64, @intCast(timeout_ms)); + return 0; + } }; // TODO: on BSD / Linux, we could just read the PEM file directly. @@ -1381,6 +1547,168 @@ pub const WsConnection = struct { } }; +pub const RuntimeEvent = struct { + fd: posix.fd_t, + events: u32, +}; + +pub const Runtime = if (builtin.target.os.tag == .linux) LinuxRuntime else struct { + pub const READABLE: u32 = 0; + pub const WRITABLE: u32 = 0; + pub const ERROR: u32 = 0; + pub const EventCallback = *const fn (ctx: *anyopaque, event: RuntimeEvent) anyerror!void; + pub fn init(_: Allocator) !@This() { + return error.UnsupportedPlatform; + } + pub fn deinit(_: *@This()) void {} + pub fn add(_: *@This(), _: posix.fd_t, _: u32, _: *anyopaque, _: EventCallback) !void { + return error.UnsupportedPlatform; + } + pub fn remove(_: *@This(), _: posix.fd_t) void {} + pub fn removeByCtx(_: *@This(), _: *anyopaque) void {} + pub fn dispatch(_: *@This(), _: i32) !usize { + return error.UnsupportedPlatform; + } + pub fn dispatchFor(_: *@This(), _: i32, _: ?posix.fd_t) !bool { + return error.UnsupportedPlatform; + } + pub fn run(_: *@This(), _: *anyopaque, _: *const fn (ctx: *anyopaque) bool) !void { + return error.UnsupportedPlatform; + } +}; + +var runtime_active = std.atomic.Value(bool).init(false); + +const LinuxRuntime = struct { + pub const READABLE: u32 = linux.EPOLL.IN; + pub const WRITABLE: u32 = linux.EPOLL.OUT; + pub const ERROR: u32 = linux.EPOLL.ERR | linux.EPOLL.HUP | linux.EPOLL.RDHUP; + + allocator: Allocator, + epoll_fd: posix.fd_t, + watchers: std.AutoHashMapUnmanaged(posix.fd_t, Watcher) = .empty, + events: [64]posix.system.epoll_event = undefined, + + const Self = @This(); + + pub const EventCallback = *const fn (ctx: *anyopaque, event: RuntimeEvent) anyerror!void; + + const Watcher = struct { + events: u32, + ctx: *anyopaque, + cb: EventCallback, + }; + + pub fn init(allocator: Allocator) !Self { + if (runtime_active.swap(true, .acq_rel)) { + return error.RuntimeAlreadyActive; + } + errdefer _ = runtime_active.swap(false, .acq_rel); + + const epoll_fd = try posix.epoll_create1(linux.EPOLL.CLOEXEC); + errdefer posix.close(epoll_fd); + + return .{ + .allocator = allocator, + .epoll_fd = epoll_fd, + }; + } + + pub fn deinit(self: *Self) void { + self.watchers.deinit(self.allocator); + posix.close(self.epoll_fd); + _ = runtime_active.swap(false, .acq_rel); + } + + pub fn add(self: *Self, fd: posix.fd_t, events: u32, ctx: *anyopaque, cb: EventCallback) !void { + var ev = posix.system.epoll_event{ + .events = events, + .data = .{ .fd = fd }, + }; + + const gop = try self.watchers.getOrPut(self.allocator, fd); + if (gop.found_existing) { + gop.value_ptr.* = .{ .events = events, .ctx = ctx, .cb = cb }; + try posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_MOD, fd, &ev); + return; + } + + errdefer _ = self.watchers.remove(fd); + gop.value_ptr.* = .{ .events = events, .ctx = ctx, .cb = cb }; + try posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_ADD, fd, &ev); + } + + pub fn remove(self: *Self, fd: posix.fd_t) void { + _ = self.watchers.remove(fd); + posix.epoll_ctl(self.epoll_fd, linux.EPOLL.CTL_DEL, fd, null) catch {}; + } + + pub fn removeByCtx(self: *Self, ctx: *anyopaque) void { + while (true) { + var found: ?posix.fd_t = null; + var it = self.watchers.iterator(); + while (it.next()) |entry| { + if (entry.value_ptr.ctx == ctx) { + found = entry.key_ptr.*; + break; + } + } + const fd = found orelse break; + self.remove(fd); + } + } + + pub fn run(self: *Self, stop_ctx: *anyopaque, should_stop: *const fn (ctx: *anyopaque) bool) !void { + while (!should_stop(stop_ctx)) { + _ = try self.dispatch(200); + } + } + + pub fn dispatch(self: *Self, timeout_ms: i32) !usize { + const n = posix.epoll_wait(self.epoll_fd, self.events[0..], timeout_ms); + var i: usize = 0; + while (i < n) : (i += 1) { + const ev = self.events[i]; + const fd = ev.data.fd; + + const watcher = self.watchers.get(fd) orelse continue; + watcher.cb(watcher.ctx, .{ + .fd = fd, + .events = ev.events, + }) catch |err| { + log.err(.app, "runtime callback", .{ .err = err, .fd = fd }); + self.remove(fd); + }; + } + return n; + } + + pub fn dispatchFor(self: *Self, timeout_ms: i32, watched_fd: ?posix.fd_t) !bool { + const n = posix.epoll_wait(self.epoll_fd, self.events[0..], timeout_ms); + var watched_ready = false; + var i: usize = 0; + while (i < n) : (i += 1) { + const ev = self.events[i]; + const fd = ev.data.fd; + if (watched_fd) |wfd| { + if (fd == wfd) { + watched_ready = true; + } + } + + const watcher = self.watchers.get(fd) orelse continue; + watcher.cb(watcher.ctx, .{ + .fd = fd, + .events = ev.events, + }) catch |err| { + log.err(.app, "runtime callback", .{ .err = err, .fd = fd }); + self.remove(fd); + }; + } + return watched_ready; + } +}; + const testing = std.testing; test "mask" { diff --git a/src/Server.zig b/src/Server.zig index bd990560..ada4a576 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -121,26 +121,53 @@ pub fn run(self: *Server, address: net.Address, timeout_ms: u32) !void { try posix.listen(listener, self.app.config.maxPendingConnections()); log.info(.app, "server running", .{ .address = address }); - while (!self.shutdown.load(.acquire)) { - const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening, error.ConnectionAborted => { - log.info(.app, "server stopped", .{}); - break; - }, - error.WouldBlock => { - std.Thread.sleep(10 * std.time.ns_per_ms); - continue; - }, - else => { - log.err(.app, "CDP accept", .{ .err = err }); - std.Thread.sleep(std.time.ns_per_s); - continue; - }, - } + + var runtime = try Net.Runtime.init(self.allocator); + defer runtime.deinit(); + + var accept_ctx: AcceptCtx = .{ + .server = self, + .timeout_ms = timeout_ms, + }; + + try runtime.add(listener, Net.Runtime.READABLE, &accept_ctx, onListenerEvent); + defer runtime.remove(listener); + + runtime.run(self, shouldStopRuntime) catch |err| switch (err) { + error.UnsupportedPlatform => return err, + else => return err, + }; + + log.info(.app, "server stopped", .{}); +} + +const AcceptCtx = struct { + server: *Server, + timeout_ms: u32, +}; + +fn shouldStopRuntime(ctx: *anyopaque) bool { + const self: *Server = @ptrCast(@alignCast(ctx)); + return self.shutdown.load(.acquire); +} + +fn onListenerEvent(ctx: *anyopaque, event: Net.RuntimeEvent) !void { + _ = event; + const accept_ctx: *AcceptCtx = @ptrCast(@alignCast(ctx)); + const self = accept_ctx.server; + const listener = self.listener orelse return; + + while (true) { + const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| switch (err) { + error.WouldBlock => return, + error.SocketNotListening, error.ConnectionAborted => return, + else => { + log.err(.app, "CDP accept", .{ .err = err }); + return; + }, }; - self.spawnWorker(socket, timeout_ms) catch |err| { + self.spawnWorker(socket, accept_ctx.timeout_ms) catch |err| { log.err(.app, "CDP spawn", .{ .err = err }); posix.close(socket); }; diff --git a/src/http/Client.zig b/src/http/Client.zig index 326635f4..9b8feec4 100644 --- a/src/http/Client.zig +++ b/src/http/Client.zig @@ -173,6 +173,14 @@ pub fn newHeaders(self: *const Client) !Net.Headers { return Net.Headers.init(self.config.http_headers.user_agent_header); } +pub fn attachRuntime(self: *Client, runtime: *Net.Runtime, cdp_fd: ?posix.fd_t) !void { + try self.handles.attachRuntime(runtime, cdp_fd); +} + +pub fn detachRuntime(self: *Client) void { + self.handles.detachRuntime(); +} + pub fn abort(self: *Client) void { self._abort(true, 0); } diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index ca718ebb..73716665 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -39,7 +39,32 @@ pub const CurlOffT = c.curl_off_t; pub const CurlDebugFunction = fn (*Curl, CurlInfoType, [*c]u8, usize, *anyopaque) c_int; pub const CurlHeaderFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; pub const CurlWriteFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; +pub const CurlSocketCallback = fn (?*Curl, CurlSocket, c_int, ?*anyopaque, ?*anyopaque) callconv(.c) c_int; +pub const CurlTimerCallback = fn (?*CurlM, c_long, ?*anyopaque) callconv(.c) c_int; pub const curl_writefunc_error: usize = c.CURL_WRITEFUNC_ERROR; +pub const CURL_SOCKET_TIMEOUT: CurlSocket = c.CURL_SOCKET_TIMEOUT; + +pub const CurlPoll = enum(c_int) { + in = c.CURL_POLL_IN, + out = c.CURL_POLL_OUT, + inout = c.CURL_POLL_INOUT, + remove = c.CURL_POLL_REMOVE, +}; + +pub const CurlSelectMask = packed struct(c_int) { + in: bool = false, + out: bool = false, + err: bool = false, + _reserved: std.meta.Int(.unsigned, @bitSizeOf(c_int) - 3) = 0, + + pub fn toC(self: @This()) c_int { + var mask: c_int = 0; + if (self.in) mask |= c.CURL_CSELECT_IN; + if (self.out) mask |= c.CURL_CSELECT_OUT; + if (self.err) mask |= c.CURL_CSELECT_ERR; + return mask; + } +}; pub const CurlGlobalFlags = packed struct(u8) { ssl: bool = false, @@ -156,6 +181,10 @@ pub const CurlOption = enum(c.CURLoption) { pub const CurlMOption = enum(c.CURLMoption) { max_host_connections = c.CURLMOPT_MAX_HOST_CONNECTIONS, + socket_function = c.CURLMOPT_SOCKETFUNCTION, + socket_data = c.CURLMOPT_SOCKETDATA, + timer_function = c.CURLMOPT_TIMERFUNCTION, + timer_data = c.CURLMOPT_TIMERDATA, }; pub const CurlInfo = enum(c.CURLINFO) { @@ -675,6 +704,10 @@ pub fn curl_multi_setopt(multi: *CurlM, comptime option: CurlMOption, value: any }; break :blk c.curl_multi_setopt(multi, opt, n); }, + .socket_function => c.curl_multi_setopt(multi, opt, value), + .socket_data => c.curl_multi_setopt(multi, opt, value), + .timer_function => c.curl_multi_setopt(multi, opt, value), + .timer_data => c.curl_multi_setopt(multi, opt, value), }; try errorMCheck(code); } @@ -701,6 +734,15 @@ pub fn curl_multi_poll( try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds)); } +pub fn curl_multi_socket_action( + multi: *CurlM, + s: CurlSocket, + ev_bitmask: c_int, + running_handles: *c_int, +) ErrorMulti!void { + try errorMCheck(c.curl_multi_socket_action(multi, s, ev_bitmask, running_handles)); +} + pub fn curl_multi_info_read(multi: *CurlM, msgs_in_queue: *c_int) ?CurlMsg { const ptr = c.curl_multi_info_read(multi, msgs_in_queue); if (ptr == null) return null;