diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index b1b606d8..8de17da1 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -60,7 +60,9 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, shutdown: std.atomic.Value(bool) = .init(false), -// Async HTTP requests (e.g. telemetry). Created on demand. +// Multi is a heavy structure that can consume up to 2MB of RAM. +// Currently, Runtime is used sparingly, and we only create it on demand. +// When Runtime becomes truly shared, it should become a regular field. multi: ?*libcurl.CurlM = null, submission_mutex: std.Thread.Mutex = .{}, submission_queue: std.DoublyLinkedList = .{}, @@ -304,6 +306,13 @@ pub fn run(self: *Runtime) void { var drain_buf: [64]u8 = undefined; var running_handles: c_int = 0; + const poll_fd = &self.pollfds[0]; + const listen_fd = &self.pollfds[1]; + + // Please note that receiving a shutdown command does not terminate all connections. + // When gracefully shutting down a server, we at least want to send the remaining + // telemetry, but we stop accepting new connections. It is the responsibility + // of external code to terminate its requests upon shutdown. while (true) { self.drainQueue(); @@ -325,15 +334,15 @@ pub fn run(self: *Runtime) void { }; // check wakeup pipe - if (self.pollfds[0].revents != 0) { - self.pollfds[0].revents = 0; + if (poll_fd.revents != 0) { + poll_fd.revents = 0; while (true) _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; } // accept new connections - if (self.pollfds[1].revents != 0) { - self.pollfds[1].revents = 0; + if (listen_fd.revents != 0) { + listen_fd.revents = 0; self.acceptConnections(); } diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 1e723ee8..ee831ddb 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -12,7 +12,8 @@ const Runtime = @import("../network/Runtime.zig"); const Connection = @import("../network/http.zig").Connection; const URL = "https://telemetry.lightpanda.io"; -const MAX_BATCH_SIZE = 20; +const BATCH_SIZE = 20; +const BUFFER_SIZE = BATCH_SIZE * 2; const LightPanda = @This(); @@ -21,7 +22,7 @@ runtime: *Runtime, mutex: std.Thread.Mutex = .{}, pcount: usize = 0, -pending: [MAX_BATCH_SIZE * 2]LightPandaEvent = undefined, +pending: [BUFFER_SIZE]LightPandaEvent = undefined, pub fn init(app: *App) !LightPanda { return .{ @@ -39,6 +40,11 @@ pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_e self.mutex.lock(); defer self.mutex.unlock(); + if (self.pcount == BUFFER_SIZE) { + log.err(.telemetry, "telemetry buffer exhausted", .{}); + return; + } + self.pending[self.pcount] = .{ .iid = iid, .mode = run_mode, @@ -49,7 +55,7 @@ pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_e break :blk self.pcount; }; - if (pending_count >= MAX_BATCH_SIZE) { + if (pending_count >= BATCH_SIZE) { self.flush(); } } @@ -68,8 +74,6 @@ fn postEvent(self: *LightPanda) !void { defer self.mutex.unlock(); const events = self.pending[0..self.pcount]; - self.pcount = 0; - if (events.len == 0) return; for (events) |*event| { @@ -84,6 +88,7 @@ fn postEvent(self: *LightPanda) !void { try conn.setMethod(.POST); try conn.setBody(writer.written()); + self.pcount = 0; self.runtime.submitRequest(conn); }