From 5fb561dc9c1dd824ba0c507ce4c984da6d26a6ae Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Mon, 16 Mar 2026 23:22:25 +0000 Subject: [PATCH] Used ring buffer for telemetry events buffer --- src/App.zig | 4 +- src/network/Runtime.zig | 62 +++++++++++++++++++-- src/telemetry/lightpanda.zig | 101 ++++++++++++++++++++--------------- src/telemetry/telemetry.zig | 94 +++++++++++++------------------- 4 files changed, 155 insertions(+), 106 deletions(-) diff --git a/src/App.zig b/src/App.zig index 9039cec5..d3ba2fec 100644 --- a/src/App.zig +++ b/src/App.zig @@ -67,7 +67,7 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { app.app_dir_path = getAndMakeAppDir(allocator); app.telemetry = try Telemetry.init(app, config.mode); - errdefer app.telemetry.deinit(); + errdefer app.telemetry.deinit(allocator); app.arena_pool = ArenaPool.init(allocator, 512, 1024 * 16); errdefer app.arena_pool.deinit(); @@ -85,7 +85,7 @@ pub fn deinit(self: *App) void { allocator.free(app_dir_path); self.app_dir_path = null; } - self.telemetry.deinit(); + self.telemetry.deinit(allocator); self.network.deinit(); self.snapshot.deinit(); self.platform.deinit(); diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 8de17da1..a47a1948 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -41,6 +41,8 @@ const Listener = struct { // Number of fixed pollfds entries (wakeup pipe + listener). const PSEUDO_POLLFDS = 2; +const MAX_TICK_CALLBACKS = 16; + allocator: Allocator, config: *const Config, @@ -67,6 +69,15 @@ multi: ?*libcurl.CurlM = null, submission_mutex: std.Thread.Mutex = .{}, submission_queue: std.DoublyLinkedList = .{}, +callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined, +callbacks_len: usize = 0, +callbacks_mutex: std.Thread.Mutex = .{}, + +const TickCallback = struct { + ctx: *anyopaque, + fun: *const fn (*anyopaque) void, +}; + const ZigToCurlAllocator = struct { // C11 requires malloc to return memory aligned to max_align_t (16 bytes on x86_64). // We match this guarantee since libcurl expects malloc-compatible alignment. @@ -302,6 +313,30 @@ pub fn bind( }; } +pub fn onTick(self: *Runtime, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void { + self.callbacks_mutex.lock(); + defer self.callbacks_mutex.unlock(); + + lp.assert(self.callbacks_len < MAX_TICK_CALLBACKS, "too many ticks", .{}); + + self.callbacks[self.callbacks_len] = .{ + .ctx = ctx, + .fun = callback, + }; + self.callbacks_len += 1; + + self.wakeupPoll(); +} + +pub fn fireTicks(self: *Runtime) void { + self.callbacks_mutex.lock(); + defer self.callbacks_mutex.unlock(); + + for (self.callbacks[0..self.callbacks_len]) |*callback| { + callback.fun(callback.ctx); + } +} + pub fn run(self: *Runtime) void { var drain_buf: [64]u8 = undefined; var running_handles: c_int = 0; @@ -326,7 +361,20 @@ pub fn run(self: *Runtime) void { self.preparePollFds(multi); } - const timeout = if (self.multi != null) self.getCurlTimeout() else @as(i32, -1); + // for ontick to work, you need to wake up periodically + const timeout = blk: { + const min_timeout = 250; // 250ms + if (self.multi == null) { + break :blk min_timeout; + } + + const curl_timeout = self.getCurlTimeout(); + if (curl_timeout == 0) { + break :blk min_timeout; + } + + break :blk @min(min_timeout, curl_timeout); + }; _ = posix.poll(self.pollfds, timeout) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); @@ -354,8 +402,16 @@ pub fn run(self: *Runtime) void { self.processCompletions(multi); } - if (self.shutdown.load(.acquire) and running_handles == 0) - break; + self.fireTicks(); + + if (self.shutdown.load(.acquire) and running_handles == 0) { + // Check if fireTicks submitted new requests (e.g. telemetry flush). + // If so, continue the loop to drain and send them before exiting. + self.submission_mutex.lock(); + const has_pending = self.submission_queue.first != null; + self.submission_mutex.unlock(); + if (!has_pending) break; + } } if (self.listener) |listener| { diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index ee831ddb..b580570b 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -11,84 +11,97 @@ const telemetry = @import("telemetry.zig"); const Runtime = @import("../network/Runtime.zig"); const Connection = @import("../network/http.zig").Connection; -const URL = "https://telemetry.lightpanda.io"; -const BATCH_SIZE = 20; -const BUFFER_SIZE = BATCH_SIZE * 2; +// const URL = "https://telemetry.lightpanda.io"; +const URL = "http://localhost:9876"; +const BUFFER_SIZE = 1024; const LightPanda = @This(); allocator: Allocator, runtime: *Runtime, + +/// Protects concurrent producers in send(). mutex: std.Thread.Mutex = .{}, -pcount: usize = 0, -pending: [BUFFER_SIZE]LightPandaEvent = undefined, +iid: ?[36]u8 = null, +run_mode: Config.RunMode = .serve, -pub fn init(app: *App) !LightPanda { - return .{ +head: std.atomic.Value(usize) = .init(0), +tail: std.atomic.Value(usize) = .init(0), +dropped: std.atomic.Value(usize) = .init(0), +buffer: [BUFFER_SIZE]telemetry.Event = undefined, + +pub fn init(self: *LightPanda, app: *App, iid: ?[36]u8, run_mode: Config.RunMode) !void { + self.* = .{ .allocator = app.allocator, .runtime = &app.network, - }; -} - -pub fn deinit(self: *LightPanda) void { - self.flush(); -} - -pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { - const pending_count = blk: { - self.mutex.lock(); - defer self.mutex.unlock(); - - if (self.pcount == BUFFER_SIZE) { - log.err(.telemetry, "telemetry buffer exhausted", .{}); - return; - } - - self.pending[self.pcount] = .{ - .iid = iid, - .mode = run_mode, - .event = raw_event, - }; - self.pcount += 1; - - break :blk self.pcount; + .iid = iid, + .run_mode = run_mode, }; - if (pending_count >= BATCH_SIZE) { - self.flush(); + self.runtime.onTick(@ptrCast(self), flushCallback); +} + +pub fn deinit(_: *LightPanda) void {} + +pub fn send(self: *LightPanda, raw_event: telemetry.Event) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + const t = self.tail.load(.monotonic); + const h = self.head.load(.acquire); + if (t - h >= BUFFER_SIZE) { + _ = self.dropped.fetchAdd(1, .monotonic); + return; } + + self.buffer[t % BUFFER_SIZE] = raw_event; + self.tail.store(t + 1, .release); } -pub fn flush(self: *LightPanda) void { +fn flushCallback(ctx: *anyopaque) void { + const self: *LightPanda = @ptrCast(@alignCast(ctx)); self.postEvent() catch |err| { log.warn(.telemetry, "flush error", .{ .err = err }); }; } fn postEvent(self: *LightPanda) !void { + const h = self.head.load(.monotonic); + const t = self.tail.load(.acquire); + const dropped = self.dropped.swap(0, .monotonic); + + if (h == t and dropped == 0) return; + errdefer _ = self.dropped.fetchAdd(dropped, .monotonic); + var writer = std.Io.Writer.Allocating.init(self.allocator); defer writer.deinit(); - self.mutex.lock(); - defer self.mutex.unlock(); + const iid: ?[]const u8 = if (self.iid) |*id| id else null; - const events = self.pending[0..self.pcount]; - if (events.len == 0) return; - - for (events) |*event| { - try std.json.Stringify.value(event, .{ .emit_null_optional_fields = false }, &writer.writer); + for (h..t) |i| { + const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = self.buffer[i % BUFFER_SIZE] }; + try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &writer.writer); try writer.writer.writeByte('\n'); } - const conn = self.runtime.getConnection() orelse return; + if (dropped > 0) { + const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = .{ .buffer_overflow = .{ .dropped = dropped } } }; + try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &writer.writer); + try writer.writer.writeByte('\n'); + } + + const conn = self.runtime.getConnection() orelse { + _ = self.dropped.fetchAdd(dropped, .monotonic); + return; + }; errdefer self.runtime.releaseConnection(conn); try conn.setURL(URL); try conn.setMethod(.POST); try conn.setBody(writer.written()); - self.pcount = 0; + self.head.store(t, .release); self.runtime.submitRequest(conn); } diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index f92b2193..e79003f3 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -11,26 +11,21 @@ const uuidv4 = @import("../id.zig").uuidv4; const IID_FILE = "iid"; pub fn isDisabled() bool { + if (builtin.mode == .Debug or builtin.is_test) { + return true; + } + return std.process.hasEnvVarConstant("LIGHTPANDA_DISABLE_TELEMETRY"); } -pub const Telemetry = TelemetryT(blk: { - if (builtin.mode == .Debug or builtin.is_test) break :blk NoopProvider; - break :blk @import("lightpanda.zig"); -}); +pub const Telemetry = TelemetryT(@import("lightpanda.zig")); fn TelemetryT(comptime P: type) type { return struct { - // an "install" id that we [try to] persist and re-use between runs - // null on IO error - iid: ?[36]u8, - - provider: P, + provider: *P, disabled: bool, - run_mode: Config.RunMode, - const Self = @This(); pub fn init(app: *App, run_mode: Config.RunMode) !Self { @@ -39,31 +34,29 @@ fn TelemetryT(comptime P: type) type { log.info(.telemetry, "telemetry status", .{ .disabled = disabled }); } - const provider = try P.init(app); - errdefer provider.deinit(); + const iid: ?[36]u8 = if (disabled) null else getOrCreateId(app.app_dir_path); + + const provider = try app.allocator.create(P); + errdefer app.allocator.destroy(provider); + + try P.init(provider, app, iid, run_mode); return .{ .disabled = disabled, - .run_mode = run_mode, .provider = provider, - .iid = if (disabled) null else getOrCreateId(app.app_dir_path), }; } - pub fn flush(self: *Self) void { - self.provider.flush(); - } - - pub fn deinit(self: *Self) void { + pub fn deinit(self: *Self, allocator: Allocator) void { self.provider.deinit(); + allocator.destroy(self.provider); } pub fn record(self: *Self, event: Event) void { if (self.disabled) { return; } - const iid: ?[]const u8 = if (self.iid) |*iid| iid else null; - self.provider.send(iid, self.run_mode, event) catch |err| { + self.provider.send(event) catch |err| { log.warn(.telemetry, "record error", .{ .err = err, .type = @tagName(std.meta.activeTag(event)) }); }; } @@ -109,6 +102,7 @@ fn getOrCreateId(app_dir_path_: ?[]const u8) ?[36]u8 { pub const Event = union(enum) { run: void, navigate: Navigate, + buffer_overflow: BufferOverflow, flag: []const u8, // used for testing const Navigate = struct { @@ -116,37 +110,35 @@ pub const Event = union(enum) { proxy: bool, driver: []const u8 = "cdp", }; -}; -const NoopProvider = struct { - fn init(_: *App) !NoopProvider { - return .{}; - } - fn flush(_: NoopProvider) void {} - fn deinit(_: NoopProvider) void {} - pub fn send(_: NoopProvider, _: ?[]const u8, _: Config.RunMode, _: Event) !void {} + const BufferOverflow = struct { + dropped: usize, + }; }; extern fn setenv(name: [*:0]u8, value: [*:0]u8, override: c_int) c_int; extern fn unsetenv(name: [*:0]u8) c_int; const testing = @import("../testing.zig"); -test "telemetry: disabled by environment" { +test "telemetry: always disabled in debug builds" { + // Must be disabled regardless of environment variable. + _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); + try testing.expectEqual(true, isDisabled()); + _ = setenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY"), @constCast(""), 0); defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); + try testing.expectEqual(true, isDisabled()); const FailingProvider = struct { - fn init(_: *App) !@This() { - return .{}; - } - fn deinit(_: @This()) void {} - pub fn send(_: @This(), _: ?[]const u8, _: Config.RunMode, _: Event) !void { + fn init(_: *@This(), _: *App, _: ?[36]u8, _: Config.RunMode) !void {} + fn deinit(_: *@This()) void {} + pub fn send(_: *@This(), _: Event) !void { unreachable; } }; - var telemetry = try TelemetryT(FailingProvider).init(undefined, .serve); - defer telemetry.deinit(); + var telemetry = try TelemetryT(FailingProvider).init(testing.test_app, .serve); + defer telemetry.deinit(testing.test_app.allocator); telemetry.record(.{ .run = {} }); } @@ -170,8 +162,9 @@ test "telemetry: getOrCreateId" { test "telemetry: sends event to provider" { var telemetry = try TelemetryT(MockProvider).init(testing.test_app, .serve); - defer telemetry.deinit(); - const mock = &telemetry.provider; + defer telemetry.deinit(testing.test_app.allocator); + telemetry.disabled = false; + const mock = telemetry.provider; telemetry.record(.{ .flag = "1" }); telemetry.record(.{ .flag = "2" }); @@ -184,32 +177,19 @@ test "telemetry: sends event to provider" { } const MockProvider = struct { - iid: ?[]const u8, - run_mode: ?Config.RunMode, allocator: Allocator, events: std.ArrayList(Event), - fn init(app: *App) !@This() { - return .{ - .iid = null, - .run_mode = null, + fn init(self: *MockProvider, app: *App, _: ?[36]u8, _: Config.RunMode) !void { + self.* = .{ .events = .{}, .allocator = app.allocator, }; } - fn flush(_: *MockProvider) void {} fn deinit(self: *MockProvider) void { self.events.deinit(self.allocator); } - pub fn send(self: *MockProvider, iid: ?[]const u8, run_mode: Config.RunMode, events: Event) !void { - if (self.iid == null) { - try testing.expectEqual(null, self.run_mode); - self.iid = iid.?; - self.run_mode = run_mode; - } else { - try testing.expectEqual(self.iid.?, iid.?); - try testing.expectEqual(self.run_mode.?, run_mode); - } - try self.events.append(self.allocator, events); + pub fn send(self: *MockProvider, event: Event) !void { + try self.events.append(self.allocator, event); } };