From accf2c0e5eec99396829f317a7790ad57eac4619 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Mon, 3 Mar 2025 19:32:18 +0800 Subject: [PATCH] use async-client for telemetry --- src/app.zig | 5 +- src/main.zig | 23 ++-- src/server.zig | 4 + src/telemetry/lightpanda.zig | 205 ++++++++++++++++++++++++++--------- src/telemetry/telemetry.zig | 63 ++++------- src/unit_tests.zig | 10 +- 6 files changed, 201 insertions(+), 109 deletions(-) diff --git a/src/app.zig b/src/app.zig index a92f3b9f..158ea5d5 100644 --- a/src/app.zig +++ b/src/app.zig @@ -1,5 +1,6 @@ const std = @import("std"); +const Loop = @import("jsruntime").Loop; const Allocator = std.mem.Allocator; const Telemetry = @import("telemetry/telemetry.zig").Telemetry; @@ -8,8 +9,8 @@ const Telemetry = @import("telemetry/telemetry.zig").Telemetry; pub const App = struct { telemetry: Telemetry, - pub fn init(allocator: Allocator) !App { - const telemetry = Telemetry.init(allocator); + pub fn init(allocator: Allocator, loop: *Loop) !App { + const telemetry = Telemetry.init(allocator, loop); errdefer telemetry.deinit(); return .{ diff --git a/src/main.zig b/src/main.zig index 2aaf879c..363ed585 100644 --- a/src/main.zig +++ b/src/main.zig @@ -54,9 +54,6 @@ pub fn main() !void { _ = gpa.detectLeaks(); }; - var app = try @import("app.zig").App.init(alloc); - defer app.deinit(); - var args_arena = std.heap.ArenaAllocator.init(alloc); defer args_arena.deinit(); const args = try parseArgs(args_arena.allocator()); @@ -68,7 +65,6 @@ pub fn main() !void { return std.process.cleanExit(); }, .serve => |opts| { - app.telemetry.record(.{ .run = .{ .mode = .serve, .version = version } }); const address = std.net.Address.parseIp4(opts.host, opts.port) catch |err| { log.err("address (host:port) {any}\n", .{err}); return args.printUsageAndExit(false); @@ -77,24 +73,31 @@ pub fn main() !void { var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); + var app = try @import("app.zig").App.init(alloc, &loop); + defer app.deinit(); + app.telemetry.record(.{ .run = .{ .mode = .serve, .version = version } }); + const timeout = std.time.ns_per_s * @as(u64, opts.timeout); - server.run(alloc, address, timeout, &loop) catch |err| { + server.run(alloc, address, timeout, &loop, &app) catch |err| { log.err("Server error", .{}); return err; }; }, .fetch => |opts| { - app.telemetry.record(.{ .run = .{ .mode = .fetch, .version = version } }); log.debug("Fetch mode: url {s}, dump {any}", .{ opts.url, opts.dump }); - // vm - const vm = jsruntime.VM.init(); - defer vm.deinit(); - // loop var loop = try jsruntime.Loop.init(alloc); defer loop.deinit(); + var app = try @import("app.zig").App.init(alloc, &loop); + defer app.deinit(); + app.telemetry.record(.{ .run = .{ .mode = .fetch, .version = version } }); + + // vm + const vm = jsruntime.VM.init(); + defer vm.deinit(); + // browser var browser = Browser.init(alloc, &loop); defer browser.deinit(); diff --git a/src/server.zig b/src/server.zig index cbb0b500..18bbd112 100644 --- a/src/server.zig +++ b/src/server.zig @@ -34,6 +34,7 @@ const CloseError = jsruntime.IO.CloseError; const CancelError = jsruntime.IO.CancelOneError; const TimeoutError = jsruntime.IO.TimeoutError; +const App = @import("app.zig").App; const CDP = @import("cdp/cdp.zig").CDP; const TimeoutCheck = std.time.ns_per_ms * 100; @@ -48,6 +49,7 @@ const MAX_HTTP_REQUEST_SIZE = 2048; const MAX_MESSAGE_SIZE = 256 * 1024 + 14; const Server = struct { + app: *App, allocator: Allocator, loop: *jsruntime.Loop, @@ -1018,6 +1020,7 @@ pub fn run( address: net.Address, timeout: u64, loop: *jsruntime.Loop, + app: *App, ) !void { // create socket const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; @@ -1043,6 +1046,7 @@ pub fn run( const json_version_response = try buildJSONVersionResponse(allocator, address); var server = Server{ + .app = app, .loop = loop, .timeout = timeout, .listener = listener, diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 92ac0521..7bb1b220 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -2,75 +2,176 @@ const std = @import("std"); const Allocator = std.mem.Allocator; const ArenAallocator = std.heap.ArenaAllocator; -const Event = @import("telemetry.zig").Event; +const Loop = @import("jsruntime").Loop; +const Client = @import("asyncio").Client; + const log = std.log.scoped(.telemetry); -const URL = "https://lightpanda.io/browser-stats"; +const URL = "https://stats.lightpanda.io"; -pub const Lightpanda = struct { +pub const LightPanda = struct { uri: std.Uri, - arena: ArenAallocator, - client: std.http.Client, - headers: [1]std.http.Header, + io: Client.IO, + client: Client, + allocator: Allocator, + sending_pool: std.heap.MemoryPool(Sending), + client_context_pool: std.heap.MemoryPool(Client.Ctx), - pub fn init(allocator: Allocator) !Lightpanda { + pub fn init(allocator: Allocator, loop: *Loop) !LightPanda { return .{ + .allocator = allocator, + .io = Client.IO.init(loop), .client = .{ .allocator = allocator }, - .arena = std.heap.ArenaAllocator.init(allocator), .uri = std.Uri.parse(URL) catch unreachable, - .headers = [1]std.http.Header{ - .{ .name = "Content-Type", .value = "application/json" }, - }, + .sending_pool = std.heap.MemoryPool(Sending).init(allocator), + .client_context_pool = std.heap.MemoryPool(Client.Ctx).init(allocator), }; } - pub fn deinit(self: *Lightpanda) void { - self.arena.deinit(); + pub fn deinit(self: *LightPanda) void { self.client.deinit(); + self.sending_pool.deinit(); + self.client_context_pool.deinit(); } - pub fn send(self: *Lightpanda, iid: ?[]const u8, eid: []const u8, events: []Event) !void { - _ = self; - _ = iid; - _ = eid; - _ = events; - // defer _ = self.arena.reset(.{ .retain_capacity = {} }); - // const body = try std.json.stringifyAlloc(self.arena.allocator(), PlausibleEvent{ .event = event }, .{}); + pub fn send(self: *LightPanda, iid: ?[]const u8, eid: []const u8, event: anytype) !void { + var arena = std.heap.ArenaAllocator.init(self.allocator); + errdefer arena.deinit(); - // var server_headers: [2048]u8 = undefined; - // var req = try self.client.open(.POST, self.uri, .{ - // .redirect_behavior = .not_allowed, - // .extra_headers = &self.headers, - // .server_header_buffer = &server_headers, - // }); - // req.transfer_encoding = .{ .content_length = body.len }; - // try req.send(); + const resp_header_buffer = try arena.allocator().alloc(u8, 4096); + const body = try std.json.stringifyAlloc(arena.allocator(), .{ + .iid = iid, + .eid = eid, + .event = event, + }, .{}); - // try req.writeAll(body); - // try req.finish(); - // try req.wait(); + const sending = try self.sending_pool.create(); + errdefer self.sending_pool.destroy(sending); - // const status = req.response.status; - // if (status != .accepted) { - // log.warn("telemetry '{s}' event error: {d}", .{ @tagName(event), @intFromEnum(status) }); - // } else { - // log.warn("telemetry '{s}' sent", .{@tagName(event)}); - // } + sending.* = .{ + .body = body, + .arena = arena, + .lightpanda = self, + .request = try self.client.create(.POST, self.uri, .{ + .server_header_buffer = resp_header_buffer, + }), + }; + errdefer sending.request.deinit(); + + const ctx = try self.client_context_pool.create(); + errdefer self.client_context_pool.destroy(ctx); + + ctx.* = try Client.Ctx.init(&self.io, &sending.request); + ctx.userData = sending; + + try self.client.async_open( + .POST, + self.uri, + .{ .server_header_buffer = resp_header_buffer }, + ctx, + onRequestConnect, + ); + } + + fn handleError(self: *LightPanda, ctx: *Client.Ctx, err: anyerror) anyerror!void { + ctx.deinit(); + self.client_context_pool.destroy(ctx); + + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + sending.deinit(); + self.sending_pool.destroy(sending); + log.info("request failure: {}", .{err}); + } + + fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + res catch |err| return sending.lightpanda.handleError(ctx, err); + + ctx.req.transfer_encoding = .{ .content_length = sending.body.len }; + return ctx.req.async_send(ctx, onRequestSend) catch |err| { + return sending.lightpanda.handleError(ctx, err); + }; + } + + fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + res catch |err| return sending.lightpanda.handleError(ctx, err); + + return ctx.req.async_writeAll(sending.body, ctx, onRequestWrite) catch |err| { + return sending.lightpanda.handleError(ctx, err); + }; + } + + fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + res catch |err| return sending.lightpanda.handleError(ctx, err); + return ctx.req.async_finish(ctx, onRequestFinish) catch |err| { + return sending.lightpanda.handleError(ctx, err); + }; + } + + fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + res catch |err| return sending.lightpanda.handleError(ctx, err); + return ctx.req.async_wait(ctx, onRequestWait) catch |err| { + return sending.lightpanda.handleError(ctx, err); + }; + } + + fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { + var sending: *Sending = @ptrCast(@alignCast(ctx.userData)); + res catch |err| return sending.lightpanda.handleError(ctx, err); + + const lightpanda = sending.lightpanda; + + defer { + ctx.deinit(); + lightpanda.client_context_pool.destroy(ctx); + + sending.deinit(); + lightpanda.sending_pool.destroy(sending); + } + + var buffer: [2048]u8 = undefined; + const reader = ctx.req.reader(); + while (true) { + const n = reader.read(&buffer) catch 0; + if (n == 0) { + break; + } + } + if (ctx.req.response.status != .ok) { + log.info("invalid response: {d}", .{@intFromEnum(ctx.req.response.status)}); + } } }; -// wraps a telemetry event so that we can serialize it to plausible's event endpoint -// const PlausibleEvent = struct { -// event: Event, +const Sending = struct { + body: []const u8, + request: Client.Request, + lightpanda: *LightPanda, + arena: std.heap.ArenaAllocator, -// pub fn jsonStringify(self: PlausibleEvent, jws: anytype) !void { + pub fn deinit(self: *Sending) void { + self.arena.deinit(); + self.request.deinit(); + } +}; + +// // wraps a telemetry event so that we can serialize it to plausible's event endpoint +// const EventWrap = struct { +// iid: ?[]const u8, +// eid: []const u8, +// event: *const Event, + +// pub fn jsonStringify(self: *const EventWrap, jws: anytype) !void { // try jws.beginObject(); -// try jws.objectField("name"); -// try jws.write(@tagName(self.event)); -// try jws.objectField("url"); -// try jws.write(EVENT_URL); -// try jws.objectField("domain"); -// try jws.write(DOMAIN_KEY); +// try jws.objectField("iid"); +// try jws.write(self.iid); +// try jws.objectField("eid"); +// try jws.write(self.eid); +// try jws.objectField("event"); +// try jws.write(@tagName(self.event.*)); // try jws.objectField("props"); // switch (self.event) { // inline else => |props| try jws.write(props), @@ -80,11 +181,15 @@ pub const Lightpanda = struct { // }; // const testing = std.testing; -// test "plausible: json event" { -// const json = try std.json.stringifyAlloc(testing.allocator, PlausibleEvent{ .event = .{ .run = .{ .mode = .serve, .version = "over 9000!" } } }, .{}); +// test "telemetry: lightpanda json event" { +// const json = try std.json.stringifyAlloc(testing.allocator, EventWrap{ +// .iid = "1234", +// .eid = "abc!", +// .event = .{ .run = .{ .mode = .serve, .version = "over 9000!" } } +// }, .{}); // defer testing.allocator.free(json); // try testing.expectEqualStrings( -// \\{"name":"run","url":"https://lightpanda.io/browser-stats","domain":"localhost","props":{"version":"over 9000!","mode":"serve"}} +// \\{"event":"run","iid""1234","eid":"abc!","props":{"version":"over 9000!","mode":"serve"}} // , json); // } diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index 383a36aa..be9c911c 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -2,17 +2,16 @@ const std = @import("std"); const builtin = @import("builtin"); const Allocator = std.mem.Allocator; + +const Loop = @import("jsruntime").Loop; const uuidv4 = @import("../id.zig").uuidv4; const log = std.log.scoped(.telemetry); - -const BATCH_SIZE = 5; -const BATCH_END = BATCH_SIZE - 1; const ID_FILE = "lightpanda.id"; pub const Telemetry = TelemetryT(blk: { if (builtin.mode == .Debug or builtin.is_test) break :blk NoopProvider; - break :blk @import("lightpanda.zig").Lightpanda; + break :blk @import("lightpanda.zig").LightPanda; }); fn TelemetryT(comptime P: type) type { @@ -25,14 +24,11 @@ fn TelemetryT(comptime P: type) type { eid: [36]u8, provider: P, - // batch of events, pending[0..count] are pending - pending: [BATCH_SIZE]Event, - count: usize, disabled: bool, const Self = @This(); - pub fn init(allocator: Allocator) Self { + pub fn init(allocator: Allocator, loop: *Loop) Self { const disabled = std.process.hasEnvVarConstant("LIGHTPANDA_DISABLE_TELEMETRY"); var eid: [36]u8 = undefined; @@ -41,10 +37,8 @@ fn TelemetryT(comptime P: type) type { return .{ .iid = if (disabled) null else getOrCreateId(), .eid = eid, - .count = 0, - .pending = undefined, .disabled = disabled, - .provider = try P.init(allocator), + .provider = try P.init(allocator, loop), }; } @@ -56,19 +50,10 @@ fn TelemetryT(comptime P: type) type { if (self.disabled) { return; } - - const count = self.count; - self.pending[count] = event; - if (count < BATCH_END) { - self.count = count + 1; - return; - } - const iid: ?[]const u8 = if (self.iid) |*iid| iid else null; - self.provider.send(iid, &self.eid, &self.pending) catch |err| { + self.provider.send(iid, &self.eid, &event) catch |err| { log.warn("failed to record event: {}", .{err}); }; - self.count = 0; } }; } @@ -99,6 +84,7 @@ fn getOrCreateId() ?[36]u8 { pub const Event = union(enum) { run: Run, + navigate: void, flag: []const u8, // used for testing const Run = struct { @@ -113,11 +99,11 @@ pub const Event = union(enum) { }; const NoopProvider = struct { - fn init(_: Allocator) !NoopProvider { + fn init(_: Allocator, _: *Loop) !NoopProvider { return .{}; } fn deinit(_: NoopProvider) void {} - pub fn send(_: NoopProvider, _: ?[]const u8, _: []const u8, _: []Event) !void {} + pub fn send(_: NoopProvider, _: ?[]const u8, _: []const u8, _: anytype) !void {} }; extern fn setenv(name: [*:0]u8, value: [*:0]u8, override: c_int) c_int; @@ -128,16 +114,16 @@ test "telemetry: disabled by environment" { defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); const FailingProvider = struct { - fn init(_: Allocator) !@This() { + fn init(_: Allocator, _: *Loop) !@This() { return .{}; } fn deinit(_: @This()) void {} - pub fn send(_: @This(), _: ?[]const u8, _: []const u8, _: []Event) !void { + pub fn send(_: @This(), _: ?[]const u8, _: []const u8, _: anytype) !void { unreachable; } }; - var telemetry = TelemetryT(FailingProvider).init(testing.allocator); + var telemetry = TelemetryT(FailingProvider).init(testing.allocator, undefined); defer telemetry.deinit(); telemetry.record(.{ .run = .{ .mode = .serve, .version = "123" } }); } @@ -156,32 +142,21 @@ test "telemetry: getOrCreateId" { try testing.expectEqual(false, std.mem.eql(u8, &id1, &id3)); } -test "telemetry: sends batch" { +test "telemetry: sends event to provider" { defer std.fs.cwd().deleteFile(ID_FILE) catch {}; std.fs.cwd().deleteFile(ID_FILE) catch {}; - var telemetry = TelemetryT(MockProvider).init(testing.allocator); + var telemetry = TelemetryT(MockProvider).init(testing.allocator, undefined); defer telemetry.deinit(); const mock = &telemetry.provider; telemetry.record(.{ .flag = "1" }); telemetry.record(.{ .flag = "2" }); telemetry.record(.{ .flag = "3" }); - telemetry.record(.{ .flag = "4" }); - try testing.expectEqual(0, mock.events.items.len); - telemetry.record(.{ .flag = "5" }); - try testing.expectEqual(5, mock.events.items.len); - - telemetry.record(.{ .flag = "6" }); - telemetry.record(.{ .flag = "7" }); - telemetry.record(.{ .flag = "8" }); - telemetry.record(.{ .flag = "9" }); - try testing.expectEqual(5, mock.events.items.len); - telemetry.record(.{ .flag = "a" }); - try testing.expectEqual(10, mock.events.items.len); + try testing.expectEqual(3, mock.events.items.len); for (mock.events.items, 0..) |event, i| { - try testing.expectEqual(i + 1, std.fmt.parseInt(usize, event.flag, 16)); + try testing.expectEqual(i + 1, std.fmt.parseInt(usize, event.flag, 10)); } } @@ -191,7 +166,7 @@ const MockProvider = struct { allocator: Allocator, events: std.ArrayListUnmanaged(Event), - fn init(allocator: Allocator) !@This() { + fn init(allocator: Allocator, _: *Loop) !@This() { return .{ .iid = null, .eid = null, @@ -202,7 +177,7 @@ const MockProvider = struct { fn deinit(self: *MockProvider) void { self.events.deinit(self.allocator); } - pub fn send(self: *MockProvider, iid: ?[]const u8, eid: []const u8, events: []Event) !void { + pub fn send(self: *MockProvider, iid: ?[]const u8, eid: []const u8, events: *const Event) !void { if (self.iid == null) { try testing.expectEqual(null, self.eid); self.iid = iid.?; @@ -211,6 +186,6 @@ const MockProvider = struct { try testing.expectEqualStrings(self.iid.?, iid.?); try testing.expectEqualStrings(self.eid.?, eid); } - try self.events.appendSlice(self.allocator, events); + try self.events.append(self.allocator, events.*); } }; diff --git a/src/unit_tests.zig b/src/unit_tests.zig index d677e5d7..96335131 100644 --- a/src/unit_tests.zig +++ b/src/unit_tests.zig @@ -22,6 +22,7 @@ const parser = @import("netsurf"); const Allocator = std.mem.Allocator; +const App = @import("app.zig").App; const jsruntime = @import("jsruntime"); pub const Types = jsruntime.reflect(@import("generate.zig").Tuple(.{}){}); pub const UserContext = @import("user_context.zig").UserContext; @@ -47,6 +48,9 @@ pub fn main() !void { var loop = try jsruntime.Loop.init(allocator); defer loop.deinit(); + var app = try App.init(allocator, &loop); + defer app.deinit(); + const env = Env.init(allocator); defer env.deinit(allocator); @@ -67,7 +71,7 @@ pub fn main() !void { const cdp_thread = blk: { const address = try std.net.Address.parseIp("127.0.0.1", 9583); - const thread = try std.Thread.spawn(.{}, serveCDP, .{ allocator, address, &loop }); + const thread = try std.Thread.spawn(.{}, serveCDP, .{ allocator, address, &loop, &app }); break :blk thread; }; defer cdp_thread.join(); @@ -349,9 +353,9 @@ fn serveHTTP(address: std.net.Address) !void { } } -fn serveCDP(allocator: Allocator, address: std.net.Address, loop: *jsruntime.Loop) !void { +fn serveCDP(allocator: Allocator, address: std.net.Address, loop: *jsruntime.Loop, app: *App) !void { const server = @import("server.zig"); - server.run(allocator, address, std.time.ns_per_s * 2, loop) catch |err| { + server.run(allocator, address, std.time.ns_per_s * 2, loop, app) catch |err| { std.debug.print("CDP server error: {}", .{err}); return err; };