Move comments and bound checks

This commit is contained in:
Nikolay Govorov
2026-03-13 12:44:24 +00:00
parent 51fb08e6aa
commit b14ae02548
2 changed files with 24 additions and 10 deletions

View File

@@ -60,7 +60,9 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 },
shutdown: std.atomic.Value(bool) = .init(false), shutdown: std.atomic.Value(bool) = .init(false),
// Async HTTP requests (e.g. telemetry). Created on demand. // Multi is a heavy structure that can consume up to 2MB of RAM.
// Currently, Runtime is used sparingly, and we only create it on demand.
// When Runtime becomes truly shared, it should become a regular field.
multi: ?*libcurl.CurlM = null, multi: ?*libcurl.CurlM = null,
submission_mutex: std.Thread.Mutex = .{}, submission_mutex: std.Thread.Mutex = .{},
submission_queue: std.DoublyLinkedList = .{}, submission_queue: std.DoublyLinkedList = .{},
@@ -304,6 +306,13 @@ pub fn run(self: *Runtime) void {
var drain_buf: [64]u8 = undefined; var drain_buf: [64]u8 = undefined;
var running_handles: c_int = 0; var running_handles: c_int = 0;
const poll_fd = &self.pollfds[0];
const listen_fd = &self.pollfds[1];
// Please note that receiving a shutdown command does not terminate all connections.
// When gracefully shutting down a server, we at least want to send the remaining
// telemetry, but we stop accepting new connections. It is the responsibility
// of external code to terminate its requests upon shutdown.
while (true) { while (true) {
self.drainQueue(); self.drainQueue();
@@ -325,15 +334,15 @@ pub fn run(self: *Runtime) void {
}; };
// check wakeup pipe // check wakeup pipe
if (self.pollfds[0].revents != 0) { if (poll_fd.revents != 0) {
self.pollfds[0].revents = 0; poll_fd.revents = 0;
while (true) while (true)
_ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break;
} }
// accept new connections // accept new connections
if (self.pollfds[1].revents != 0) { if (listen_fd.revents != 0) {
self.pollfds[1].revents = 0; listen_fd.revents = 0;
self.acceptConnections(); self.acceptConnections();
} }

View File

@@ -12,7 +12,8 @@ const Runtime = @import("../network/Runtime.zig");
const Connection = @import("../network/http.zig").Connection; const Connection = @import("../network/http.zig").Connection;
const URL = "https://telemetry.lightpanda.io"; const URL = "https://telemetry.lightpanda.io";
const MAX_BATCH_SIZE = 20; const BATCH_SIZE = 20;
const BUFFER_SIZE = BATCH_SIZE * 2;
const LightPanda = @This(); const LightPanda = @This();
@@ -21,7 +22,7 @@ runtime: *Runtime,
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
pcount: usize = 0, pcount: usize = 0,
pending: [MAX_BATCH_SIZE * 2]LightPandaEvent = undefined, pending: [BUFFER_SIZE]LightPandaEvent = undefined,
pub fn init(app: *App) !LightPanda { pub fn init(app: *App) !LightPanda {
return .{ return .{
@@ -39,6 +40,11 @@ pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_e
self.mutex.lock(); self.mutex.lock();
defer self.mutex.unlock(); defer self.mutex.unlock();
if (self.pcount == BUFFER_SIZE) {
log.err(.telemetry, "telemetry buffer exhausted", .{});
return;
}
self.pending[self.pcount] = .{ self.pending[self.pcount] = .{
.iid = iid, .iid = iid,
.mode = run_mode, .mode = run_mode,
@@ -49,7 +55,7 @@ pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_e
break :blk self.pcount; break :blk self.pcount;
}; };
if (pending_count >= MAX_BATCH_SIZE) { if (pending_count >= BATCH_SIZE) {
self.flush(); self.flush();
} }
} }
@@ -68,8 +74,6 @@ fn postEvent(self: *LightPanda) !void {
defer self.mutex.unlock(); defer self.mutex.unlock();
const events = self.pending[0..self.pcount]; const events = self.pending[0..self.pcount];
self.pcount = 0;
if (events.len == 0) return; if (events.len == 0) return;
for (events) |*event| { for (events) |*event| {
@@ -84,6 +88,7 @@ fn postEvent(self: *LightPanda) !void {
try conn.setMethod(.POST); try conn.setMethod(.POST);
try conn.setBody(writer.written()); try conn.setBody(writer.written());
self.pcount = 0;
self.runtime.submitRequest(conn); self.runtime.submitRequest(conn);
} }