From a6d699ad5dbea208e5e58038150c3126f650aa21 Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Thu, 12 Mar 2026 10:17:26 +0000 Subject: [PATCH] Use common network runtime for telemetry messages --- src/main.zig | 44 +++++--- src/network/Runtime.zig | 198 +++++++++++++++++++++++++++++------ src/network/http.zig | 11 +- src/sys/libcurl.zig | 18 +++- src/telemetry/lightpanda.zig | 161 ++++++++++------------------ src/telemetry/telemetry.zig | 8 +- 6 files changed, 285 insertions(+), 155 deletions(-) diff --git a/src/main.zig b/src/main.zig index 1b5ffdb2..d9c1e8ee 100644 --- a/src/main.zig +++ b/src/main.zig @@ -79,18 +79,21 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { log.opts.filter_scopes = lfs; } + // must be installed before any other threads + const sighandler = try main_arena.create(SigHandler); + sighandler.* = .{ .arena = main_arena }; + try sighandler.install(); + // _app is global to handle graceful shutdown. var app = try App.init(allocator, &args); - defer app.deinit(); + + try sighandler.on(lp.Network.stop, .{&app.network}); + app.telemetry.record(.{ .run = {} }); switch (args.mode) { .serve => |opts| { - const sighandler = try main_arena.create(SigHandler); - sighandler.* = .{ .arena = main_arena }; - try sighandler.install(); - log.debug(.app, "startup", .{ .mode = "serve", .snapshot = app.snapshot.fromEmbedded() }); const address = std.net.Address.parseIp(opts.host, opts.port) catch |err| { log.fatal(.app, "invalid server address", .{ .err = err, .host = opts.host, .port = opts.port }); @@ -112,7 +115,6 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { }; defer server.deinit(); - try sighandler.on(lp.Network.stop, .{&app.network}); app.network.run(); }, .fetch => |opts| { @@ -135,10 +137,10 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { fetch_opts.writer = &writer.interface; } - lp.fetch(app, url, fetch_opts) catch |err| { - log.fatal(.app, "fetch error", .{ .err = err, .url = url }); - return err; - }; + var worker_thread = try std.Thread.spawn(.{}, fetchThread, .{ app, url, fetch_opts }); + defer worker_thread.join(); + + app.network.run(); }, .mcp => { log.info(.mcp, "starting server", .{}); @@ -150,11 +152,27 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { var mcp_server: *lp.mcp.Server = try .init(allocator, app, &stdout.interface); defer mcp_server.deinit(); - var stdin_buf: [64 * 1024]u8 = undefined; - var stdin = std.fs.File.stdin().reader(&stdin_buf); + var worker_thread = try std.Thread.spawn(.{}, mcpThread, .{ mcp_server, app }); + defer worker_thread.join(); - try lp.mcp.router.processRequests(mcp_server, &stdin.interface); + app.network.run(); }, else => unreachable, } } + +fn fetchThread(app: *App, url: [:0]const u8, fetch_opts: lp.FetchOpts) void { + defer app.network.stop(); + lp.fetch(app, url, fetch_opts) catch |err| { + log.fatal(.app, "fetch error", .{ .err = err, .url = url }); + }; +} + +fn mcpThread(mcp_server: *lp.mcp.Server, app: *App) void { + defer app.network.stop(); + var stdin_buf: [64 * 1024]u8 = undefined; + var stdin = std.fs.File.stdin().reader(&stdin_buf); + lp.mcp.router.processRequests(mcp_server, &stdin.interface) catch |err| { + log.fatal(.mcp, "mcp error", .{ .err = err }); + }; +} diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 196e2aa2..58db4e10 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -38,6 +38,9 @@ const Listener = struct { onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, }; +// Number of fixed pollfds entries (wakeup pipe + listener). +const PSEUDO_POLLFDS = 2; + allocator: Allocator, config: *const Config, @@ -57,6 +60,11 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, shutdown: std.atomic.Value(bool) = .init(false), +// Async HTTP requests (e.g. telemetry) +multi: *libcurl.CurlM, +submission_mutex: std.Thread.Mutex = .{}, +submission_queue: std.DoublyLinkedList = .{}, + const ZigToCurlAllocator = struct { // C11 requires malloc to return memory aligned to max_align_t (16 bytes on x86_64). // We match this guarantee since libcurl expects malloc-compatible alignment. @@ -185,8 +193,11 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - // 0 is wakeup, 1 is listener - const pollfds = try allocator.alloc(posix.pollfd, 2); + const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; + errdefer libcurl.curl_multi_cleanup(multi) catch {}; + + // 0 is wakeup, 1 is listener, rest for curl fds + const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); errdefer allocator.free(pollfds); @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); @@ -216,16 +227,22 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { .allocator = allocator, .config = config, .ca_blob = ca_blob, - .robot_store = RobotStore.init(allocator), - .connections = connections, - .available = available, - .web_bot_auth = web_bot_auth, + + .multi = multi, .pollfds = pollfds, .wakeup_pipe = pipe, + + .available = available, + .connections = connections, + + .robot_store = RobotStore.init(allocator), + .web_bot_auth = web_bot_auth, }; } pub fn deinit(self: *Runtime) void { + libcurl.curl_multi_cleanup(self.multi) catch {}; + for (&self.wakeup_pipe) |*fd| { if (fd.* >= 0) { posix.close(fd.*); @@ -286,44 +303,48 @@ pub fn bind( } pub fn run(self: *Runtime) void { - while (!self.shutdown.load(.acquire)) { - const listener = self.listener orelse return; + var drain_buf: [64]u8 = undefined; + var running_handles: c_int = 0; - _ = posix.poll(self.pollfds, -1) catch |err| { + while (true) { + self.drainQueue(); + + // Kickstart newly added handles (DNS/connect) so that + // curl registers its sockets before we poll. + libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); + }; + + self.preparePollFds(); + + const timeout = self.getCurlTimeout(); + + _ = posix.poll(self.pollfds, timeout) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); continue; }; - // check wakeup socket + // check wakeup pipe if (self.pollfds[0].revents != 0) { self.pollfds[0].revents = 0; - - // If we were woken up, perhaps everything was cancelled and the iteration can be completed. - if (self.shutdown.load(.acquire)) break; + while (true) + _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; } - // check new connections; - if (self.pollfds[1].revents == 0) continue; - self.pollfds[1].revents = 0; + // accept new connections + if (self.pollfds[1].revents != 0) { + self.pollfds[1].revents = 0; + self.acceptConnections(); + } - const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening => { - self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; - self.listener = null; - }, - error.ConnectionAborted => { - lp.log.warn(.app, "accept connection aborted", .{}); - }, - error.WouldBlock => {}, - else => { - lp.log.err(.app, "accept", .{ .err = err }); - }, - } - continue; + // Drive transfers and process completions. + libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); }; + self.processCompletions(); - listener.onAccept(listener.ctx, socket); + if (self.shutdown.load(.acquire) and running_handles == 0) + break; } if (self.listener) |listener| { @@ -340,9 +361,120 @@ pub fn run(self: *Runtime) void { } } +pub fn submitRequest(self: *Runtime, conn: *net_http.Connection) void { + self.submission_mutex.lock(); + self.submission_queue.append(&conn.node); + self.submission_mutex.unlock(); + self.wakeupPoll(); +} + +fn wakeupPoll(self: *Runtime) void { + _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; +} + +fn drainQueue(self: *Runtime) void { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + + while (self.submission_queue.popFirst()) |node| { + const conn: *net_http.Connection = @fieldParentPtr("node", node); + conn.setPrivate(conn) catch |err| { + lp.log.err(.app, "curl set private", .{ .err = err }); + self.releaseConnection(conn); + continue; + }; + libcurl.curl_multi_add_handle(self.multi, conn.easy) catch |err| { + lp.log.err(.app, "curl multi add", .{ .err = err }); + self.releaseConnection(conn); + }; + } +} + pub fn stop(self: *Runtime) void { self.shutdown.store(true, .release); - _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; + self.wakeupPoll(); +} + +fn acceptConnections(self: *Runtime) void { + if (self.shutdown.load(.acquire)) { + return; + } + const listener = self.listener orelse return; + + while (true) { + const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { + switch (err) { + error.WouldBlock => break, + error.SocketNotListening => { + self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; + self.listener = null; + return; + }, + error.ConnectionAborted => { + lp.log.warn(.app, "accept connection aborted", .{}); + continue; + }, + else => { + lp.log.err(.app, "accept error", .{ .err = err }); + continue; + }, + } + }; + + listener.onAccept(listener.ctx, socket); + } +} + +fn preparePollFds(self: *Runtime) void { + const curl_fds = self.pollfds[PSEUDO_POLLFDS..]; + @memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 }); + + var fd_count: c_uint = 0; + const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds); + libcurl.curl_multi_waitfds(self.multi, wait_fds, &fd_count) catch |err| { + lp.log.err(.app, "curl waitfds", .{ .err = err }); + }; +} + +fn getCurlTimeout(self: *Runtime) i32 { + var timeout_ms: c_long = -1; + libcurl.curl_multi_timeout(self.multi, &timeout_ms) catch return -1; + return @intCast(@min(timeout_ms, std.math.maxInt(i32))); +} + +fn processCompletions(self: *Runtime) void { + var msgs_in_queue: c_int = 0; + while (libcurl.curl_multi_info_read(self.multi, &msgs_in_queue)) |msg| { + switch (msg.data) { + .done => |maybe_err| { + if (maybe_err) |err| { + lp.log.warn(.app, "curl transfer error", .{ .err = err }); + } + }, + else => continue, + } + + const easy: *libcurl.Curl = msg.easy_handle; + var ptr: *anyopaque = undefined; + libcurl.curl_easy_getinfo(easy, .private, &ptr) catch + lp.assert(false, "curl getinfo private", .{}); + const conn: *net_http.Connection = @ptrCast(@alignCast(ptr)); + + libcurl.curl_multi_remove_handle(self.multi, easy) catch {}; + self.releaseConnection(conn); + } +} + +comptime { + if (@sizeOf(posix.pollfd) != @sizeOf(libcurl.CurlWaitFd)) { + @compileError("pollfd and CurlWaitFd size mismatch"); + } + if (@offsetOf(posix.pollfd, "fd") != @offsetOf(libcurl.CurlWaitFd, "fd") or + @offsetOf(posix.pollfd, "events") != @offsetOf(libcurl.CurlWaitFd, "events") or + @offsetOf(posix.pollfd, "revents") != @offsetOf(libcurl.CurlWaitFd, "revents")) + { + @compileError("pollfd and CurlWaitFd layout mismatch"); + } } pub fn getConnection(self: *Runtime) ?*net_http.Connection { diff --git a/src/network/http.zig b/src/network/http.zig index b0f70375..7916260d 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -386,11 +386,18 @@ pub const Connection = struct { } pub fn reset(self: *const Connection) !void { + try libcurl.curl_easy_setopt(self.easy, .proxy, null); + try libcurl.curl_easy_setopt(self.easy, .http_header, null); + try libcurl.curl_easy_setopt(self.easy, .header_data, null); try libcurl.curl_easy_setopt(self.easy, .header_function, null); + try libcurl.curl_easy_setopt(self.easy, .write_data, null); - try libcurl.curl_easy_setopt(self.easy, .write_function, null); - try libcurl.curl_easy_setopt(self.easy, .proxy, null); + try libcurl.curl_easy_setopt(self.easy, .write_function, discardBody); + } + + fn discardBody(_: [*]const u8, count: usize, len: usize, _: ?*anyopaque) usize { + return count * len; } pub fn setProxy(self: *const Connection, proxy: ?[:0]const u8) !void { diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index b93ecc77..1c6f9f13 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -629,10 +629,13 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .write_function => blk: { const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { .null => null, - .@"fn" => struct { + .@"fn" => |info| struct { fn cb(buffer: [*c]u8, count: usize, len: usize, user: ?*anyopaque) callconv(.c) usize { - const u = user orelse unreachable; - return value(@ptrCast(buffer), count, len, u); + const user_arg = if (@typeInfo(info.params[3].type.?) == .optional) + user + else + user orelse unreachable; + return value(@ptrCast(buffer), count, len, user_arg); } }.cb, else => @compileError("expected Zig function or null for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))), @@ -753,6 +756,15 @@ pub fn curl_multi_poll( try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds)); } +pub fn curl_multi_waitfds(multi: *CurlM, ufds: []CurlWaitFd, fd_count: *c_uint) ErrorMulti!void { + const raw_fds: [*c]c.curl_waitfd = if (ufds.len == 0) null else @ptrCast(ufds.ptr); + try errorMCheck(c.curl_multi_waitfds(multi, raw_fds, @intCast(ufds.len), fd_count)); +} + +pub fn curl_multi_timeout(multi: *CurlM, timeout_ms: *c_long) ErrorMulti!void { + try errorMCheck(c.curl_multi_timeout(multi, timeout_ms)); +} + pub fn curl_multi_info_read(multi: *CurlM, msgs_in_queue: *c_int) ?CurlMsg { const ptr = c.curl_multi_info_read(multi, msgs_in_queue); if (ptr == null) return null; diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 62ac2f99..1e723ee8 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -2,140 +2,95 @@ const std = @import("std"); const builtin = @import("builtin"); const build_config = @import("build_config"); -const Thread = std.Thread; const Allocator = std.mem.Allocator; const log = @import("../log.zig"); const App = @import("../App.zig"); const Config = @import("../Config.zig"); const telemetry = @import("telemetry.zig"); +const Runtime = @import("../network/Runtime.zig"); const Connection = @import("../network/http.zig").Connection; const URL = "https://telemetry.lightpanda.io"; const MAX_BATCH_SIZE = 20; -pub const LightPanda = struct { - running: bool, - thread: ?std.Thread, - allocator: Allocator, - mutex: std.Thread.Mutex, - cond: Thread.Condition, - connection: Connection, - config: *const Config, - pending: std.DoublyLinkedList, - mem_pool: std.heap.MemoryPool(LightPandaEvent), +const LightPanda = @This(); - pub fn init(app: *App) !LightPanda { - const connection = try app.network.newConnection(); - errdefer connection.deinit(); +allocator: Allocator, +runtime: *Runtime, +mutex: std.Thread.Mutex = .{}, - try connection.setURL(URL); - try connection.setMethod(.POST); +pcount: usize = 0, +pending: [MAX_BATCH_SIZE * 2]LightPandaEvent = undefined, - const allocator = app.allocator; - return .{ - .cond = .{}, - .mutex = .{}, - .pending = .{}, - .thread = null, - .running = true, - .allocator = allocator, - .connection = connection, - .config = app.config, - .mem_pool = std.heap.MemoryPool(LightPandaEvent).init(allocator), - }; - } +pub fn init(app: *App) !LightPanda { + return .{ + .allocator = app.allocator, + .runtime = &app.network, + }; +} - pub fn deinit(self: *LightPanda) void { - if (self.thread) |*thread| { - self.mutex.lock(); - self.running = false; - self.mutex.unlock(); - self.cond.signal(); - thread.join(); - } - self.mem_pool.deinit(); - self.connection.deinit(); - } +pub fn deinit(self: *LightPanda) void { + self.flush(); +} - pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { - const event = try self.mem_pool.create(); - event.* = .{ +pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { + const pending_count = blk: { + self.mutex.lock(); + defer self.mutex.unlock(); + + self.pending[self.pcount] = .{ .iid = iid, .mode = run_mode, .event = raw_event, - .node = .{}, }; + self.pcount += 1; - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.thread == null) { - self.thread = try std.Thread.spawn(.{}, run, .{self}); - } + break :blk self.pcount; + }; - self.pending.append(&event.node); - self.cond.signal(); + if (pending_count >= MAX_BATCH_SIZE) { + self.flush(); + } +} + +pub fn flush(self: *LightPanda) void { + self.postEvent() catch |err| { + log.warn(.telemetry, "flush error", .{ .err = err }); + }; +} + +fn postEvent(self: *LightPanda) !void { + var writer = std.Io.Writer.Allocating.init(self.allocator); + defer writer.deinit(); + + self.mutex.lock(); + defer self.mutex.unlock(); + + const events = self.pending[0..self.pcount]; + self.pcount = 0; + + if (events.len == 0) return; + + for (events) |*event| { + try std.json.Stringify.value(event, .{ .emit_null_optional_fields = false }, &writer.writer); + try writer.writer.writeByte('\n'); } - fn run(self: *LightPanda) void { - var aw = std.Io.Writer.Allocating.init(self.allocator); - defer aw.deinit(); + const conn = self.runtime.getConnection() orelse return; + errdefer self.runtime.releaseConnection(conn); - var batch: [MAX_BATCH_SIZE]*LightPandaEvent = undefined; - self.mutex.lock(); - while (true) { - while (self.pending.first != null) { - const b = self.collectBatch(&batch); - self.mutex.unlock(); - self.postEvent(b, &aw) catch |err| { - log.warn(.telemetry, "post error", .{ .err = err }); - }; - self.mutex.lock(); - } - if (self.running == false) { - return; - } - self.cond.wait(&self.mutex); - } - } + try conn.setURL(URL); + try conn.setMethod(.POST); + try conn.setBody(writer.written()); - fn postEvent(self: *LightPanda, events: []*LightPandaEvent, aw: *std.Io.Writer.Allocating) !void { - defer for (events) |e| { - self.mem_pool.destroy(e); - }; - - defer aw.clearRetainingCapacity(); - for (events) |event| { - try std.json.Stringify.value(event, .{ .emit_null_optional_fields = false }, &aw.writer); - try aw.writer.writeByte('\n'); - } - - try self.connection.setBody(aw.written()); - const status = try self.connection.request(&self.config.http_headers); - - if (status != 200) { - log.warn(.telemetry, "server error", .{ .status = status }); - } - } - - fn collectBatch(self: *LightPanda, into: []*LightPandaEvent) []*LightPandaEvent { - var i: usize = 0; - while (self.pending.popFirst()) |node| { - into[i] = @fieldParentPtr("node", node); - i += 1; - if (i == MAX_BATCH_SIZE) { - break; - } - } - return into[0..i]; - } -}; + self.runtime.submitRequest(conn); +} const LightPandaEvent = struct { iid: ?[]const u8, mode: Config.RunMode, event: telemetry.Event, - node: std.DoublyLinkedList.Node, pub fn jsonStringify(self: *const LightPandaEvent, writer: anytype) !void { try writer.beginObject(); diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index dfdc0200..f92b2193 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -16,7 +16,7 @@ pub fn isDisabled() bool { pub const Telemetry = TelemetryT(blk: { if (builtin.mode == .Debug or builtin.is_test) break :blk NoopProvider; - break :blk @import("lightpanda.zig").LightPanda; + break :blk @import("lightpanda.zig"); }); fn TelemetryT(comptime P: type) type { @@ -50,6 +50,10 @@ fn TelemetryT(comptime P: type) type { }; } + pub fn flush(self: *Self) void { + self.provider.flush(); + } + pub fn deinit(self: *Self) void { self.provider.deinit(); } @@ -118,6 +122,7 @@ const NoopProvider = struct { fn init(_: *App) !NoopProvider { return .{}; } + fn flush(_: NoopProvider) void {} fn deinit(_: NoopProvider) void {} pub fn send(_: NoopProvider, _: ?[]const u8, _: Config.RunMode, _: Event) !void {} }; @@ -192,6 +197,7 @@ const MockProvider = struct { .allocator = app.allocator, }; } + fn flush(_: *MockProvider) void {} fn deinit(self: *MockProvider) void { self.events.deinit(self.allocator); }