Limit telemetry body size

This commit is contained in:
Nikolay Govorov
2026-03-17 10:31:00 +00:00
parent 5fb561dc9c
commit 9a055a61a6

View File

@@ -11,14 +11,15 @@ const telemetry = @import("telemetry.zig");
const Runtime = @import("../network/Runtime.zig"); const Runtime = @import("../network/Runtime.zig");
const Connection = @import("../network/http.zig").Connection; const Connection = @import("../network/http.zig").Connection;
// const URL = "https://telemetry.lightpanda.io"; const URL = "https://telemetry.lightpanda.io";
const URL = "http://localhost:9876";
const BUFFER_SIZE = 1024; const BUFFER_SIZE = 1024;
const MAX_BODY_SIZE = 500 * 1024; // 500KB server limit
const LightPanda = @This(); const LightPanda = @This();
allocator: Allocator, allocator: Allocator,
runtime: *Runtime, runtime: *Runtime,
writer: std.Io.Writer.Allocating,
/// Protects concurrent producers in send(). /// Protects concurrent producers in send().
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
@@ -33,16 +34,19 @@ buffer: [BUFFER_SIZE]telemetry.Event = undefined,
pub fn init(self: *LightPanda, app: *App, iid: ?[36]u8, run_mode: Config.RunMode) !void { pub fn init(self: *LightPanda, app: *App, iid: ?[36]u8, run_mode: Config.RunMode) !void {
self.* = .{ self.* = .{
.allocator = app.allocator,
.runtime = &app.network,
.iid = iid, .iid = iid,
.run_mode = run_mode, .run_mode = run_mode,
.allocator = app.allocator,
.runtime = &app.network,
.writer = std.Io.Writer.Allocating.init(app.allocator),
}; };
self.runtime.onTick(@ptrCast(self), flushCallback); self.runtime.onTick(@ptrCast(self), flushCallback);
} }
pub fn deinit(_: *LightPanda) void {} pub fn deinit(self: *LightPanda) void {
self.writer.deinit();
}
pub fn send(self: *LightPanda, raw_event: telemetry.Event) !void { pub fn send(self: *LightPanda, raw_event: telemetry.Event) !void {
self.mutex.lock(); self.mutex.lock();
@@ -67,44 +71,59 @@ fn flushCallback(ctx: *anyopaque) void {
} }
fn postEvent(self: *LightPanda) !void { fn postEvent(self: *LightPanda) !void {
const h = self.head.load(.monotonic);
const t = self.tail.load(.acquire);
const dropped = self.dropped.swap(0, .monotonic);
if (h == t and dropped == 0) return;
errdefer _ = self.dropped.fetchAdd(dropped, .monotonic);
var writer = std.Io.Writer.Allocating.init(self.allocator);
defer writer.deinit();
const iid: ?[]const u8 = if (self.iid) |*id| id else null;
for (h..t) |i| {
const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = self.buffer[i % BUFFER_SIZE] };
try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &writer.writer);
try writer.writer.writeByte('\n');
}
if (dropped > 0) {
const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = .{ .buffer_overflow = .{ .dropped = dropped } } };
try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &writer.writer);
try writer.writer.writeByte('\n');
}
const conn = self.runtime.getConnection() orelse { const conn = self.runtime.getConnection() orelse {
_ = self.dropped.fetchAdd(dropped, .monotonic);
return; return;
}; };
errdefer self.runtime.releaseConnection(conn); errdefer self.runtime.releaseConnection(conn);
const h = self.head.load(.monotonic);
const t = self.tail.load(.acquire);
const dropped = self.dropped.swap(0, .monotonic);
if (h == t and dropped == 0) {
self.runtime.releaseConnection(conn);
return;
}
errdefer _ = self.dropped.fetchAdd(dropped, .monotonic);
self.writer.clearRetainingCapacity();
if (dropped > 0) {
_ = try self.writeEvent(.{ .buffer_overflow = .{ .dropped = dropped } });
}
var sent: usize = 0;
for (h..t) |i| {
const fit = try self.writeEvent(self.buffer[i % BUFFER_SIZE]);
if (!fit) break;
sent += 1;
}
try conn.setURL(URL); try conn.setURL(URL);
try conn.setMethod(.POST); try conn.setMethod(.POST);
try conn.setBody(writer.written()); try conn.setBody(self.writer.written());
self.head.store(t, .release); self.head.store(h + sent, .release);
self.runtime.submitRequest(conn); self.runtime.submitRequest(conn);
} }
fn writeEvent(self: *LightPanda, event: telemetry.Event) !bool {
const iid: ?[]const u8 = if (self.iid) |*id| id else null;
const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = event };
const checkpoint = self.writer.written().len;
try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &self.writer.writer);
try self.writer.writer.writeByte('\n');
if (self.writer.written().len > MAX_BODY_SIZE) {
self.writer.shrinkRetainingCapacity(checkpoint);
return false;
}
return true;
}
const LightPandaEvent = struct { const LightPandaEvent = struct {
iid: ?[]const u8, iid: ?[]const u8,
mode: Config.RunMode, mode: Config.RunMode,