From 51fb08e6aa822223eed9e7128329eb161f92d440 Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Thu, 12 Mar 2026 13:23:06 +0000 Subject: [PATCH] Create multi interface in Runtime on demand --- src/network/Runtime.zig | 66 +++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 58db4e10..b1b606d8 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -60,8 +60,8 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, shutdown: std.atomic.Value(bool) = .init(false), -// Async HTTP requests (e.g. telemetry) -multi: *libcurl.CurlM, +// Async HTTP requests (e.g. telemetry). Created on demand. +multi: ?*libcurl.CurlM = null, submission_mutex: std.Thread.Mutex = .{}, submission_queue: std.DoublyLinkedList = .{}, @@ -193,9 +193,6 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; - errdefer libcurl.curl_multi_cleanup(multi) catch {}; - // 0 is wakeup, 1 is listener, rest for curl fds const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); errdefer allocator.free(pollfds); @@ -228,7 +225,6 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { .config = config, .ca_blob = ca_blob, - .multi = multi, .pollfds = pollfds, .wakeup_pipe = pipe, @@ -241,7 +237,9 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { } pub fn deinit(self: *Runtime) void { - libcurl.curl_multi_cleanup(self.multi) catch {}; + if (self.multi) |multi| { + libcurl.curl_multi_cleanup(multi) catch {}; + } for (&self.wakeup_pipe) |*fd| { if (fd.* >= 0) { @@ -309,15 +307,17 @@ pub fn run(self: *Runtime) void { while (true) { self.drainQueue(); - // Kickstart newly added handles (DNS/connect) so that - // curl registers its sockets before we poll. - libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { - lp.log.err(.app, "curl perform", .{ .err = err }); - }; + if (self.multi) |multi| { + // Kickstart newly added handles (DNS/connect) so that + // curl registers its sockets before we poll. + libcurl.curl_multi_perform(multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); + }; - self.preparePollFds(); + self.preparePollFds(multi); + } - const timeout = self.getCurlTimeout(); + const timeout = if (self.multi != null) self.getCurlTimeout() else @as(i32, -1); _ = posix.poll(self.pollfds, timeout) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); @@ -337,11 +337,13 @@ pub fn run(self: *Runtime) void { self.acceptConnections(); } - // Drive transfers and process completions. - libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { - lp.log.err(.app, "curl perform", .{ .err = err }); - }; - self.processCompletions(); + if (self.multi) |multi| { + // Drive transfers and process completions. + libcurl.curl_multi_perform(multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); + }; + self.processCompletions(multi); + } if (self.shutdown.load(.acquire) and running_handles == 0) break; @@ -376,6 +378,17 @@ fn drainQueue(self: *Runtime) void { self.submission_mutex.lock(); defer self.submission_mutex.unlock(); + if (self.submission_queue.first == null) return; + + const multi = self.multi orelse blk: { + const m = libcurl.curl_multi_init() orelse { + lp.assert(false, "curl multi init failed", .{}); + unreachable; + }; + self.multi = m; + break :blk m; + }; + while (self.submission_queue.popFirst()) |node| { const conn: *net_http.Connection = @fieldParentPtr("node", node); conn.setPrivate(conn) catch |err| { @@ -383,7 +396,7 @@ fn drainQueue(self: *Runtime) void { self.releaseConnection(conn); continue; }; - libcurl.curl_multi_add_handle(self.multi, conn.easy) catch |err| { + libcurl.curl_multi_add_handle(multi, conn.easy) catch |err| { lp.log.err(.app, "curl multi add", .{ .err = err }); self.releaseConnection(conn); }; @@ -425,26 +438,27 @@ fn acceptConnections(self: *Runtime) void { } } -fn preparePollFds(self: *Runtime) void { +fn preparePollFds(self: *Runtime, multi: *libcurl.CurlM) void { const curl_fds = self.pollfds[PSEUDO_POLLFDS..]; @memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 }); var fd_count: c_uint = 0; const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds); - libcurl.curl_multi_waitfds(self.multi, wait_fds, &fd_count) catch |err| { + libcurl.curl_multi_waitfds(multi, wait_fds, &fd_count) catch |err| { lp.log.err(.app, "curl waitfds", .{ .err = err }); }; } fn getCurlTimeout(self: *Runtime) i32 { + const multi = self.multi orelse return -1; var timeout_ms: c_long = -1; - libcurl.curl_multi_timeout(self.multi, &timeout_ms) catch return -1; + libcurl.curl_multi_timeout(multi, &timeout_ms) catch return -1; return @intCast(@min(timeout_ms, std.math.maxInt(i32))); } -fn processCompletions(self: *Runtime) void { +fn processCompletions(self: *Runtime, multi: *libcurl.CurlM) void { var msgs_in_queue: c_int = 0; - while (libcurl.curl_multi_info_read(self.multi, &msgs_in_queue)) |msg| { + while (libcurl.curl_multi_info_read(multi, &msgs_in_queue)) |msg| { switch (msg.data) { .done => |maybe_err| { if (maybe_err) |err| { @@ -460,7 +474,7 @@ fn processCompletions(self: *Runtime) void { lp.assert(false, "curl getinfo private", .{}); const conn: *net_http.Connection = @ptrCast(@alignCast(ptr)); - libcurl.curl_multi_remove_handle(self.multi, easy) catch {}; + libcurl.curl_multi_remove_handle(multi, easy) catch {}; self.releaseConnection(conn); } }