Create multi interface in Runtime on demand

This commit is contained in:
Nikolay Govorov
2026-03-12 13:23:06 +00:00
parent a6d699ad5d
commit 51fb08e6aa

View File

@@ -60,8 +60,8 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 },
shutdown: std.atomic.Value(bool) = .init(false), shutdown: std.atomic.Value(bool) = .init(false),
// Async HTTP requests (e.g. telemetry) // Async HTTP requests (e.g. telemetry). Created on demand.
multi: *libcurl.CurlM, multi: ?*libcurl.CurlM = null,
submission_mutex: std.Thread.Mutex = .{}, submission_mutex: std.Thread.Mutex = .{},
submission_queue: std.DoublyLinkedList = .{}, submission_queue: std.DoublyLinkedList = .{},
@@ -193,9 +193,6 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime {
const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer libcurl.curl_multi_cleanup(multi) catch {};
// 0 is wakeup, 1 is listener, rest for curl fds // 0 is wakeup, 1 is listener, rest for curl fds
const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent());
errdefer allocator.free(pollfds); errdefer allocator.free(pollfds);
@@ -228,7 +225,6 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime {
.config = config, .config = config,
.ca_blob = ca_blob, .ca_blob = ca_blob,
.multi = multi,
.pollfds = pollfds, .pollfds = pollfds,
.wakeup_pipe = pipe, .wakeup_pipe = pipe,
@@ -241,7 +237,9 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime {
} }
pub fn deinit(self: *Runtime) void { pub fn deinit(self: *Runtime) void {
libcurl.curl_multi_cleanup(self.multi) catch {}; if (self.multi) |multi| {
libcurl.curl_multi_cleanup(multi) catch {};
}
for (&self.wakeup_pipe) |*fd| { for (&self.wakeup_pipe) |*fd| {
if (fd.* >= 0) { if (fd.* >= 0) {
@@ -309,15 +307,17 @@ pub fn run(self: *Runtime) void {
while (true) { while (true) {
self.drainQueue(); self.drainQueue();
// Kickstart newly added handles (DNS/connect) so that if (self.multi) |multi| {
// curl registers its sockets before we poll. // Kickstart newly added handles (DNS/connect) so that
libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { // curl registers its sockets before we poll.
lp.log.err(.app, "curl perform", .{ .err = err }); libcurl.curl_multi_perform(multi, &running_handles) catch |err| {
}; lp.log.err(.app, "curl perform", .{ .err = err });
};
self.preparePollFds(); self.preparePollFds(multi);
}
const timeout = self.getCurlTimeout(); const timeout = if (self.multi != null) self.getCurlTimeout() else @as(i32, -1);
_ = posix.poll(self.pollfds, timeout) catch |err| { _ = posix.poll(self.pollfds, timeout) catch |err| {
lp.log.err(.app, "poll", .{ .err = err }); lp.log.err(.app, "poll", .{ .err = err });
@@ -337,11 +337,13 @@ pub fn run(self: *Runtime) void {
self.acceptConnections(); self.acceptConnections();
} }
// Drive transfers and process completions. if (self.multi) |multi| {
libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { // Drive transfers and process completions.
lp.log.err(.app, "curl perform", .{ .err = err }); libcurl.curl_multi_perform(multi, &running_handles) catch |err| {
}; lp.log.err(.app, "curl perform", .{ .err = err });
self.processCompletions(); };
self.processCompletions(multi);
}
if (self.shutdown.load(.acquire) and running_handles == 0) if (self.shutdown.load(.acquire) and running_handles == 0)
break; break;
@@ -376,6 +378,17 @@ fn drainQueue(self: *Runtime) void {
self.submission_mutex.lock(); self.submission_mutex.lock();
defer self.submission_mutex.unlock(); defer self.submission_mutex.unlock();
if (self.submission_queue.first == null) return;
const multi = self.multi orelse blk: {
const m = libcurl.curl_multi_init() orelse {
lp.assert(false, "curl multi init failed", .{});
unreachable;
};
self.multi = m;
break :blk m;
};
while (self.submission_queue.popFirst()) |node| { while (self.submission_queue.popFirst()) |node| {
const conn: *net_http.Connection = @fieldParentPtr("node", node); const conn: *net_http.Connection = @fieldParentPtr("node", node);
conn.setPrivate(conn) catch |err| { conn.setPrivate(conn) catch |err| {
@@ -383,7 +396,7 @@ fn drainQueue(self: *Runtime) void {
self.releaseConnection(conn); self.releaseConnection(conn);
continue; continue;
}; };
libcurl.curl_multi_add_handle(self.multi, conn.easy) catch |err| { libcurl.curl_multi_add_handle(multi, conn.easy) catch |err| {
lp.log.err(.app, "curl multi add", .{ .err = err }); lp.log.err(.app, "curl multi add", .{ .err = err });
self.releaseConnection(conn); self.releaseConnection(conn);
}; };
@@ -425,26 +438,27 @@ fn acceptConnections(self: *Runtime) void {
} }
} }
fn preparePollFds(self: *Runtime) void { fn preparePollFds(self: *Runtime, multi: *libcurl.CurlM) void {
const curl_fds = self.pollfds[PSEUDO_POLLFDS..]; const curl_fds = self.pollfds[PSEUDO_POLLFDS..];
@memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 }); @memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 });
var fd_count: c_uint = 0; var fd_count: c_uint = 0;
const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds); const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds);
libcurl.curl_multi_waitfds(self.multi, wait_fds, &fd_count) catch |err| { libcurl.curl_multi_waitfds(multi, wait_fds, &fd_count) catch |err| {
lp.log.err(.app, "curl waitfds", .{ .err = err }); lp.log.err(.app, "curl waitfds", .{ .err = err });
}; };
} }
fn getCurlTimeout(self: *Runtime) i32 { fn getCurlTimeout(self: *Runtime) i32 {
const multi = self.multi orelse return -1;
var timeout_ms: c_long = -1; var timeout_ms: c_long = -1;
libcurl.curl_multi_timeout(self.multi, &timeout_ms) catch return -1; libcurl.curl_multi_timeout(multi, &timeout_ms) catch return -1;
return @intCast(@min(timeout_ms, std.math.maxInt(i32))); return @intCast(@min(timeout_ms, std.math.maxInt(i32)));
} }
fn processCompletions(self: *Runtime) void { fn processCompletions(self: *Runtime, multi: *libcurl.CurlM) void {
var msgs_in_queue: c_int = 0; var msgs_in_queue: c_int = 0;
while (libcurl.curl_multi_info_read(self.multi, &msgs_in_queue)) |msg| { while (libcurl.curl_multi_info_read(multi, &msgs_in_queue)) |msg| {
switch (msg.data) { switch (msg.data) {
.done => |maybe_err| { .done => |maybe_err| {
if (maybe_err) |err| { if (maybe_err) |err| {
@@ -460,7 +474,7 @@ fn processCompletions(self: *Runtime) void {
lp.assert(false, "curl getinfo private", .{}); lp.assert(false, "curl getinfo private", .{});
const conn: *net_http.Connection = @ptrCast(@alignCast(ptr)); const conn: *net_http.Connection = @ptrCast(@alignCast(ptr));
libcurl.curl_multi_remove_handle(self.multi, easy) catch {}; libcurl.curl_multi_remove_handle(multi, easy) catch {};
self.releaseConnection(conn); self.releaseConnection(conn);
} }
} }