From cd33a089d15109b6bc32c7799d05eb9936210869 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Wed, 5 Mar 2025 22:57:41 +0800 Subject: [PATCH] flatten events, include aarch + os, remove eid --- src/app.zig | 9 +- src/cdp/testing.zig | 2 +- src/main.zig | 8 +- src/telemetry/lightpanda.zig | 251 +++++++++++++---------------------- src/telemetry/telemetry.zig | 51 +++---- src/unit_tests.zig | 2 +- 6 files changed, 124 insertions(+), 199 deletions(-) diff --git a/src/app.zig b/src/app.zig index ea08043d..1249ccdf 100644 --- a/src/app.zig +++ b/src/app.zig @@ -4,6 +4,11 @@ const Loop = @import("jsruntime").Loop; const Allocator = std.mem.Allocator; const Telemetry = @import("telemetry/telemetry.zig").Telemetry; +pub const RunMode = enum { + serve, + fetch, +}; + // Container for global state / objects that various parts of the system // might need. pub const App = struct { @@ -11,14 +16,14 @@ pub const App = struct { allocator: Allocator, telemetry: Telemetry, - pub fn init(allocator: Allocator) !App { + pub fn init(allocator: Allocator, run_mode: RunMode) !App { const loop = try allocator.create(Loop); errdefer allocator.destroy(loop); loop.* = try Loop.init(allocator); errdefer loop.deinit(); - const telemetry = Telemetry.init(allocator, loop); + const telemetry = Telemetry.init(allocator, loop, run_mode); errdefer telemetry.deinit(); return .{ diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 0e5b1508..5bc5f029 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -263,7 +263,7 @@ const TestContext = struct { pub fn context() TestContext { return .{ - .app = App.init(std.testing.allocator) catch unreachable, + .app = App.init(std.testing.allocator, .serve) catch unreachable, .arena = std.heap.ArenaAllocator.init(std.testing.allocator), }; } diff --git a/src/main.zig b/src/main.zig index d6dd955e..a7d27f6f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -70,9 +70,9 @@ pub fn main() !void { return args.printUsageAndExit(false); }; - var app = try @import("app.zig").App.init(alloc); + var app = try @import("app.zig").App.init(alloc, .serve); defer app.deinit(); - app.telemetry.record(.{ .run = .{ .mode = .serve, .version = version } }); + app.telemetry.record(.{ .run = {} }); const timeout = std.time.ns_per_s * @as(u64, opts.timeout); server.run(&app, address, timeout) catch |err| { @@ -83,9 +83,9 @@ pub fn main() !void { .fetch => |opts| { log.debug("Fetch mode: url {s}, dump {any}", .{ opts.url, opts.dump }); - var app = try @import("app.zig").App.init(alloc); + var app = try @import("app.zig").App.init(alloc, .fetch); defer app.deinit(); - app.telemetry.record(.{ .run = .{ .mode = .fetch, .version = version } }); + app.telemetry.record(.{ .run = {} }); // vm const vm = jsruntime.VM.init(); diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 0c16aeca..d77818ac 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -1,196 +1,127 @@ const std = @import("std"); -const Allocator = std.mem.Allocator; -const ArenAallocator = std.heap.ArenaAllocator; +const builtin = @import("builtin"); +const build_info = @import("build_info"); -const Loop = @import("jsruntime").Loop; -const Client = @import("asyncio").Client; +const Thread = std.Thread; +const Allocator = std.mem.Allocator; + +const telemetry = @import("telemetry.zig"); +const RunMode = @import("../app.zig").RunMode; const log = std.log.scoped(.telemetry); - -const URL = "https://telemetry.lightpanda.io/"; +const URL = "https://telemetry.lightpanda.io"; pub const LightPanda = struct { uri: std.Uri, - io: Client.IO, - client: Client, + pending: List, + running: bool, + thread: ?std.Thread, allocator: Allocator, - sending_pool: std.heap.MemoryPool(Sending), - client_context_pool: std.heap.MemoryPool(Client.Ctx), + mutex: std.Thread.Mutex, + cond: Thread.Condition, + node_pool: std.heap.MemoryPool(List.Node), - pub fn init(allocator: Allocator, loop: *Loop) !LightPanda { + const List = std.DoublyLinkedList(LightPandaEvent); + + pub fn init(allocator: Allocator) !LightPanda { return .{ + .cond = .{}, + .mutex = .{}, + .pending = .{}, + .thread = null, + .running = true, .allocator = allocator, - .io = Client.IO.init(loop), - .client = .{ .allocator = allocator }, .uri = std.Uri.parse(URL) catch unreachable, - .sending_pool = std.heap.MemoryPool(Sending).init(allocator), - .client_context_pool = std.heap.MemoryPool(Client.Ctx).init(allocator), + .node_pool = std.heap.MemoryPool(List.Node).init(allocator), }; } pub fn deinit(self: *LightPanda) void { - self.client.deinit(); - self.sending_pool.deinit(); - self.client_context_pool.deinit(); + if (self.thread) |*thread| { + self.mutex.lock(); + self.running = false; + self.mutex.unlock(); + self.cond.signal(); + thread.join(); + } + self.node_pool.deinit(); } - pub fn send(self: *LightPanda, iid: ?[]const u8, eid: []const u8, event: anytype) !void { - var arena = std.heap.ArenaAllocator.init(self.allocator); - errdefer arena.deinit(); - - const resp_header_buffer = try arena.allocator().alloc(u8, 4096); - const body = try std.json.stringifyAlloc(arena.allocator(), .{ + pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: RunMode, raw_event: telemetry.Event) !void { + const event = LightPandaEvent{ .iid = iid, - .eid = eid, - .event = event, - }, .{}); - - const sending = try self.sending_pool.create(); - errdefer self.sending_pool.destroy(sending); - - sending.* = .{ - .body = body, - .arena = arena, - .lightpanda = self, - .request = try self.client.create(.POST, self.uri, .{ - .server_header_buffer = resp_header_buffer, - }), + .driver = if (std.meta.activeTag(raw_event) == .navigate) "cdp" else null, + .mode = run_mode, + .os = builtin.os.tag, + .arch = builtin.cpu.arch, + .version = build_info.git_commit, + .event = @tagName(std.meta.activeTag(raw_event)), }; - errdefer sending.request.deinit(); - const ctx = try self.client_context_pool.create(); - errdefer self.client_context_pool.destroy(ctx); + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.thread == null) { + self.thread = try std.Thread.spawn(.{}, run, .{self}); + } - ctx.* = try Client.Ctx.init(&self.io, &sending.request); - ctx.userData = sending; - - try self.client.async_open( - .POST, - self.uri, - .{ .server_header_buffer = resp_header_buffer }, - ctx, - onRequestConnect, - ); + const node = try self.node_pool.create(); + errdefer self.node_pool.destroy(node); + node.data = event; + self.pending.append(node); + self.cond.signal(); } - fn handleError(sending: *Sending, ctx: *Client.Ctx, err: anyerror) anyerror!void { - const lightpanda = sending.lightpanda; - - ctx.deinit(); - lightpanda.client_context_pool.destroy(ctx); - - sending.deinit(); - lightpanda.sending_pool.destroy(sending); - log.info("request failure: {}", .{err}); - } - - fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - const sending: *Sending = @ptrCast(@alignCast(ctx.userData)); - res catch |err| return handleError(sending, ctx, err); - - ctx.req.transfer_encoding = .{ .content_length = sending.body.len }; - return ctx.req.async_send(ctx, onRequestSend) catch |err| { - return handleError(sending, ctx, err); - }; - } - - fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - const sending: *Sending = @ptrCast(@alignCast(ctx.userData)); - res catch |err| return handleError(sending, ctx, err); - - return ctx.req.async_writeAll(sending.body, ctx, onRequestWrite) catch |err| { - return handleError(sending, ctx, err); - }; - } - - fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - const sending: *Sending = @ptrCast(@alignCast(ctx.userData)); - res catch |err| return handleError(sending, ctx, err); - return ctx.req.async_finish(ctx, onRequestFinish) catch |err| { - return handleError(sending, ctx, err); - }; - } - - fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - const sending: *Sending = @ptrCast(@alignCast(ctx.userData)); - res catch |err| return handleError(sending, ctx, err); - return ctx.req.async_wait(ctx, onRequestWait) catch |err| { - return handleError(sending, ctx, err); - }; - } - - fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { - const sending: *Sending = @ptrCast(@alignCast(ctx.userData)); - res catch |err| return handleError(sending, ctx, err); - - const lightpanda = sending.lightpanda; + fn run(self: *LightPanda) void { + var arr: std.ArrayListUnmanaged(u8) = .{}; + var client = std.http.Client{ .allocator = self.allocator }; defer { - ctx.deinit(); - lightpanda.client_context_pool.destroy(ctx); - - sending.deinit(); - lightpanda.sending_pool.destroy(sending); + arr.deinit(self.allocator); + client.deinit(); } - var buffer: [2048]u8 = undefined; - const reader = ctx.req.reader(); + self.mutex.lock(); while (true) { - const n = reader.read(&buffer) catch 0; - if (n == 0) { - break; + while (self.pending.popFirst()) |node| { + self.mutex.unlock(); + self.postEvent(&node.data, &client, &arr) catch |err| { + log.warn("Telementry reporting error: {}", .{err}); + }; + self.mutex.lock(); + self.node_pool.destroy(node); } + if (self.running == false) { + return; + } + self.cond.wait(&self.mutex); } - if (ctx.req.response.status != .ok) { - log.info("invalid response: {d}", .{@intFromEnum(ctx.req.response.status)}); + } + + fn postEvent(self: *const LightPanda, event: *const LightPandaEvent, client: *std.http.Client, arr: *std.ArrayListUnmanaged(u8)) !void { + defer arr.clearRetainingCapacity(); + try std.json.stringify(event, .{ .emit_null_optional_fields = false }, arr.writer(self.allocator)); + + var response_header_buffer: [2048]u8 = undefined; + + const result = try client.fetch(.{ + .method = .POST, + .payload = arr.items, + .response_storage = .ignore, + .location = .{ .uri = self.uri }, + .server_header_buffer = &response_header_buffer, + }); + if (result.status != .ok) { + log.warn("server error status: {}", .{result.status}); } } }; -const Sending = struct { - body: []const u8, - request: Client.Request, - lightpanda: *LightPanda, - arena: std.heap.ArenaAllocator, - - pub fn deinit(self: *Sending) void { - self.arena.deinit(); - self.request.deinit(); - } +const LightPandaEvent = struct { + iid: ?[]const u8, + mode: RunMode, + driver: ?[]const u8, + os: std.Target.Os.Tag, + arch: std.Target.Cpu.Arch, + version: []const u8, + event: []const u8, }; - -// // wraps a telemetry event so that we can serialize it to plausible's event endpoint -// const EventWrap = struct { -// iid: ?[]const u8, -// eid: []const u8, -// event: *const Event, - -// pub fn jsonStringify(self: *const EventWrap, jws: anytype) !void { -// try jws.beginObject(); -// try jws.objectField("iid"); -// try jws.write(self.iid); -// try jws.objectField("eid"); -// try jws.write(self.eid); -// try jws.objectField("event"); -// try jws.write(@tagName(self.event.*)); -// try jws.objectField("props"); -// switch (self.event) { -// inline else => |props| try jws.write(props), -// } -// try jws.endObject(); -// } -// }; - -// const testing = std.testing; -// test "telemetry: lightpanda json event" { -// const json = try std.json.stringifyAlloc(testing.allocator, EventWrap{ -// .iid = "1234", -// .eid = "abc!", -// .event = .{ .run = .{ .mode = .serve, .version = "over 9000!" } } -// }, .{}); -// defer testing.allocator.free(json); - -// try testing.expectEqualStrings( -// \\{"event":"run","iid""1234","eid":"abc!","props":{"version":"over 9000!","mode":"serve"}} -// , json); -// } diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index be9c911c..b76d96ef 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -5,6 +5,7 @@ const Allocator = std.mem.Allocator; const Loop = @import("jsruntime").Loop; const uuidv4 = @import("../id.zig").uuidv4; +const RunMode = @import("../app.zig").RunMode; const log = std.log.scoped(.telemetry); const ID_FILE = "lightpanda.id"; @@ -20,24 +21,21 @@ fn TelemetryT(comptime P: type) type { // null on IO error iid: ?[36]u8, - // a "execution" id is an id that represents this specific run - eid: [36]u8, provider: P, disabled: bool, + run_mode: RunMode, + const Self = @This(); - pub fn init(allocator: Allocator, loop: *Loop) Self { + pub fn init(allocator: Allocator, loop: *Loop, run_mode: RunMode) Self { const disabled = std.process.hasEnvVarConstant("LIGHTPANDA_DISABLE_TELEMETRY"); - var eid: [36]u8 = undefined; - uuidv4(&eid); - return .{ - .iid = if (disabled) null else getOrCreateId(), - .eid = eid, .disabled = disabled, + .run_mode = run_mode, + .iid = if (disabled) null else getOrCreateId(), .provider = try P.init(allocator, loop), }; } @@ -51,7 +49,7 @@ fn TelemetryT(comptime P: type) type { return; } const iid: ?[]const u8 = if (self.iid) |*iid| iid else null; - self.provider.send(iid, &self.eid, &event) catch |err| { + self.provider.send(iid, self.run_mode, &event) catch |err| { log.warn("failed to record event: {}", .{err}); }; } @@ -83,19 +81,9 @@ fn getOrCreateId() ?[36]u8 { } pub const Event = union(enum) { - run: Run, + run: void, navigate: void, flag: []const u8, // used for testing - - const Run = struct { - version: []const u8, - mode: RunMode, - - const RunMode = enum { - fetch, - serve, - }; - }; }; const NoopProvider = struct { @@ -103,11 +91,12 @@ const NoopProvider = struct { return .{}; } fn deinit(_: NoopProvider) void {} - pub fn send(_: NoopProvider, _: ?[]const u8, _: []const u8, _: anytype) !void {} + pub fn send(_: NoopProvider, _: ?[]const u8, _: RunMode, _: *const Event) !void {} }; extern fn setenv(name: [*:0]u8, value: [*:0]u8, override: c_int) c_int; extern fn unsetenv(name: [*:0]u8) c_int; + const testing = std.testing; test "telemetry: disabled by environment" { _ = setenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY"), @constCast(""), 0); @@ -118,14 +107,14 @@ test "telemetry: disabled by environment" { return .{}; } fn deinit(_: @This()) void {} - pub fn send(_: @This(), _: ?[]const u8, _: []const u8, _: anytype) !void { + pub fn send(_: @This(), _: ?[]const u8, _: RunMode, _: *const Event) !void { unreachable; } }; - var telemetry = TelemetryT(FailingProvider).init(testing.allocator, undefined); + var telemetry = TelemetryT(FailingProvider).init(testing.allocator, undefined, .serve); defer telemetry.deinit(); - telemetry.record(.{ .run = .{ .mode = .serve, .version = "123" } }); + telemetry.record(.{ .run = {} }); } test "telemetry: getOrCreateId" { @@ -146,7 +135,7 @@ test "telemetry: sends event to provider" { defer std.fs.cwd().deleteFile(ID_FILE) catch {}; std.fs.cwd().deleteFile(ID_FILE) catch {}; - var telemetry = TelemetryT(MockProvider).init(testing.allocator, undefined); + var telemetry = TelemetryT(MockProvider).init(testing.allocator, undefined, .serve); defer telemetry.deinit(); const mock = &telemetry.provider; @@ -162,14 +151,14 @@ test "telemetry: sends event to provider" { const MockProvider = struct { iid: ?[]const u8, - eid: ?[]const u8, + run_mode: ?RunMode, allocator: Allocator, events: std.ArrayListUnmanaged(Event), fn init(allocator: Allocator, _: *Loop) !@This() { return .{ .iid = null, - .eid = null, + .run_mode = null, .events = .{}, .allocator = allocator, }; @@ -177,14 +166,14 @@ const MockProvider = struct { fn deinit(self: *MockProvider) void { self.events.deinit(self.allocator); } - pub fn send(self: *MockProvider, iid: ?[]const u8, eid: []const u8, events: *const Event) !void { + pub fn send(self: *MockProvider, iid: ?[]const u8, run_mode: RunMode, events: *const Event) !void { if (self.iid == null) { - try testing.expectEqual(null, self.eid); + try testing.expectEqual(null, self.run_mode); self.iid = iid.?; - self.eid = eid; + self.run_mode = run_mode; } else { try testing.expectEqualStrings(self.iid.?, iid.?); - try testing.expectEqualStrings(self.eid.?, eid); + try testing.expectEqual(self.run_mode.?, run_mode); } try self.events.append(self.allocator, events.*); } diff --git a/src/unit_tests.zig b/src/unit_tests.zig index 3d625251..1cc9975f 100644 --- a/src/unit_tests.zig +++ b/src/unit_tests.zig @@ -45,7 +45,7 @@ pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; const allocator = gpa.allocator(); - var app = try App.init(allocator); + var app = try App.init(allocator, .serve); defer app.deinit(); const env = Env.init(allocator);