diff --git a/src/App.zig b/src/App.zig index 9039cec5..d3ba2fec 100644 --- a/src/App.zig +++ b/src/App.zig @@ -67,7 +67,7 @@ pub fn init(allocator: Allocator, config: *const Config) !*App { app.app_dir_path = getAndMakeAppDir(allocator); app.telemetry = try Telemetry.init(app, config.mode); - errdefer app.telemetry.deinit(); + errdefer app.telemetry.deinit(allocator); app.arena_pool = ArenaPool.init(allocator, 512, 1024 * 16); errdefer app.arena_pool.deinit(); @@ -85,7 +85,7 @@ pub fn deinit(self: *App) void { allocator.free(app_dir_path); self.app_dir_path = null; } - self.telemetry.deinit(); + self.telemetry.deinit(allocator); self.network.deinit(); self.snapshot.deinit(); self.platform.deinit(); diff --git a/src/Server.zig b/src/Server.zig index e43313da..d172f6dd 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -64,17 +64,17 @@ pub fn init(app: *App, address: net.Address) !*Server { return self; } -pub fn deinit(self: *Server) void { - // Stop all active clients - { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); +pub fn shutdown(self: *Server) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); - for (self.clients.items) |client| { - client.stop(); - } + for (self.clients.items) |client| { + client.stop(); } +} +pub fn deinit(self: *Server) void { + self.shutdown(); self.joinThreads(); self.clients.deinit(self.allocator); self.clients_pool.deinit(); diff --git a/src/main.zig b/src/main.zig index 1b5ffdb2..640f8231 100644 --- a/src/main.zig +++ b/src/main.zig @@ -79,18 +79,21 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { log.opts.filter_scopes = lfs; } + // must be installed before any other threads + const sighandler = try main_arena.create(SigHandler); + sighandler.* = .{ .arena = main_arena }; + try sighandler.install(); + // _app is global to handle graceful shutdown. var app = try App.init(allocator, &args); - defer app.deinit(); + + try sighandler.on(lp.Network.stop, .{&app.network}); + app.telemetry.record(.{ .run = {} }); switch (args.mode) { .serve => |opts| { - const sighandler = try main_arena.create(SigHandler); - sighandler.* = .{ .arena = main_arena }; - try sighandler.install(); - log.debug(.app, "startup", .{ .mode = "serve", .snapshot = app.snapshot.fromEmbedded() }); const address = std.net.Address.parseIp(opts.host, opts.port) catch |err| { log.fatal(.app, "invalid server address", .{ .err = err, .host = opts.host, .port = opts.port }); @@ -112,7 +115,8 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { }; defer server.deinit(); - try sighandler.on(lp.Network.stop, .{&app.network}); + try sighandler.on(lp.Server.shutdown, .{server}); + app.network.run(); }, .fetch => |opts| { @@ -135,10 +139,10 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { fetch_opts.writer = &writer.interface; } - lp.fetch(app, url, fetch_opts) catch |err| { - log.fatal(.app, "fetch error", .{ .err = err, .url = url }); - return err; - }; + var worker_thread = try std.Thread.spawn(.{}, fetchThread, .{ app, url, fetch_opts }); + defer worker_thread.join(); + + app.network.run(); }, .mcp => { log.info(.mcp, "starting server", .{}); @@ -150,11 +154,27 @@ fn run(allocator: Allocator, main_arena: Allocator) !void { var mcp_server: *lp.mcp.Server = try .init(allocator, app, &stdout.interface); defer mcp_server.deinit(); - var stdin_buf: [64 * 1024]u8 = undefined; - var stdin = std.fs.File.stdin().reader(&stdin_buf); + var worker_thread = try std.Thread.spawn(.{}, mcpThread, .{ mcp_server, app }); + defer worker_thread.join(); - try lp.mcp.router.processRequests(mcp_server, &stdin.interface); + app.network.run(); }, else => unreachable, } } + +fn fetchThread(app: *App, url: [:0]const u8, fetch_opts: lp.FetchOpts) void { + defer app.network.stop(); + lp.fetch(app, url, fetch_opts) catch |err| { + log.fatal(.app, "fetch error", .{ .err = err, .url = url }); + }; +} + +fn mcpThread(mcp_server: *lp.mcp.Server, app: *App) void { + defer app.network.stop(); + var stdin_buf: [64 * 1024]u8 = undefined; + var stdin = std.fs.File.stdin().reader(&stdin_buf); + lp.mcp.router.processRequests(mcp_server, &stdin.interface) catch |err| { + log.fatal(.mcp, "mcp error", .{ .err = err }); + }; +} diff --git a/src/network/Runtime.zig b/src/network/Runtime.zig index 196e2aa2..72aebe81 100644 --- a/src/network/Runtime.zig +++ b/src/network/Runtime.zig @@ -38,6 +38,11 @@ const Listener = struct { onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, }; +// Number of fixed pollfds entries (wakeup pipe + listener). +const PSEUDO_POLLFDS = 2; + +const MAX_TICK_CALLBACKS = 16; + allocator: Allocator, config: *const Config, @@ -57,6 +62,22 @@ wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, shutdown: std.atomic.Value(bool) = .init(false), +// Multi is a heavy structure that can consume up to 2MB of RAM. +// Currently, Runtime is used sparingly, and we only create it on demand. +// When Runtime becomes truly shared, it should become a regular field. +multi: ?*libcurl.CurlM = null, +submission_mutex: std.Thread.Mutex = .{}, +submission_queue: std.DoublyLinkedList = .{}, + +callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined, +callbacks_len: usize = 0, +callbacks_mutex: std.Thread.Mutex = .{}, + +const TickCallback = struct { + ctx: *anyopaque, + fun: *const fn (*anyopaque) void, +}; + const ZigToCurlAllocator = struct { // C11 requires malloc to return memory aligned to max_align_t (16 bytes on x86_64). // We match this guarantee since libcurl expects malloc-compatible alignment. @@ -185,8 +206,8 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - // 0 is wakeup, 1 is listener - const pollfds = try allocator.alloc(posix.pollfd, 2); + // 0 is wakeup, 1 is listener, rest for curl fds + const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); errdefer allocator.free(pollfds); @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); @@ -216,16 +237,23 @@ pub fn init(allocator: Allocator, config: *const Config) !Runtime { .allocator = allocator, .config = config, .ca_blob = ca_blob, - .robot_store = RobotStore.init(allocator), - .connections = connections, - .available = available, - .web_bot_auth = web_bot_auth, + .pollfds = pollfds, .wakeup_pipe = pipe, + + .available = available, + .connections = connections, + + .robot_store = RobotStore.init(allocator), + .web_bot_auth = web_bot_auth, }; } pub fn deinit(self: *Runtime) void { + if (self.multi) |multi| { + libcurl.curl_multi_cleanup(multi) catch {}; + } + for (&self.wakeup_pipe) |*fd| { if (fd.* >= 0) { posix.close(fd.*); @@ -285,45 +313,105 @@ pub fn bind( }; } -pub fn run(self: *Runtime) void { - while (!self.shutdown.load(.acquire)) { - const listener = self.listener orelse return; +pub fn onTick(self: *Runtime, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void { + self.callbacks_mutex.lock(); + defer self.callbacks_mutex.unlock(); - _ = posix.poll(self.pollfds, -1) catch |err| { + lp.assert(self.callbacks_len < MAX_TICK_CALLBACKS, "too many ticks", .{}); + + self.callbacks[self.callbacks_len] = .{ + .ctx = ctx, + .fun = callback, + }; + self.callbacks_len += 1; + + self.wakeupPoll(); +} + +pub fn fireTicks(self: *Runtime) void { + self.callbacks_mutex.lock(); + defer self.callbacks_mutex.unlock(); + + for (self.callbacks[0..self.callbacks_len]) |*callback| { + callback.fun(callback.ctx); + } +} + +pub fn run(self: *Runtime) void { + var drain_buf: [64]u8 = undefined; + var running_handles: c_int = 0; + + const poll_fd = &self.pollfds[0]; + const listen_fd = &self.pollfds[1]; + + // Please note that receiving a shutdown command does not terminate all connections. + // When gracefully shutting down a server, we at least want to send the remaining + // telemetry, but we stop accepting new connections. It is the responsibility + // of external code to terminate its requests upon shutdown. + while (true) { + self.drainQueue(); + + if (self.multi) |multi| { + // Kickstart newly added handles (DNS/connect) so that + // curl registers its sockets before we poll. + libcurl.curl_multi_perform(multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); + }; + + self.preparePollFds(multi); + } + + // for ontick to work, you need to wake up periodically + const timeout = blk: { + const min_timeout = 250; // 250ms + if (self.multi == null) { + break :blk min_timeout; + } + + const curl_timeout = self.getCurlTimeout(); + if (curl_timeout == 0) { + break :blk min_timeout; + } + + break :blk @min(min_timeout, curl_timeout); + }; + + _ = posix.poll(self.pollfds, timeout) catch |err| { lp.log.err(.app, "poll", .{ .err = err }); continue; }; - // check wakeup socket - if (self.pollfds[0].revents != 0) { - self.pollfds[0].revents = 0; - - // If we were woken up, perhaps everything was cancelled and the iteration can be completed. - if (self.shutdown.load(.acquire)) break; + // check wakeup pipe + if (poll_fd.revents != 0) { + poll_fd.revents = 0; + while (true) + _ = posix.read(self.wakeup_pipe[0], &drain_buf) catch break; } - // check new connections; - if (self.pollfds[1].revents == 0) continue; - self.pollfds[1].revents = 0; + // accept new connections + if (listen_fd.revents != 0) { + listen_fd.revents = 0; + self.acceptConnections(); + } - const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { - switch (err) { - error.SocketNotListening => { - self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; - self.listener = null; - }, - error.ConnectionAborted => { - lp.log.warn(.app, "accept connection aborted", .{}); - }, - error.WouldBlock => {}, - else => { - lp.log.err(.app, "accept", .{ .err = err }); - }, - } - continue; - }; + if (self.multi) |multi| { + // Drive transfers and process completions. + libcurl.curl_multi_perform(multi, &running_handles) catch |err| { + lp.log.err(.app, "curl perform", .{ .err = err }); + }; + self.processCompletions(multi); + } - listener.onAccept(listener.ctx, socket); + self.fireTicks(); + + if (self.shutdown.load(.acquire) and running_handles == 0) { + // Check if fireTicks submitted new requests (e.g. telemetry flush). + // If so, continue the loop to drain and send them before exiting. + self.submission_mutex.lock(); + const has_pending = self.submission_queue.first != null; + self.submission_mutex.unlock(); + if (!has_pending) break; + } } if (self.listener) |listener| { @@ -340,9 +428,132 @@ pub fn run(self: *Runtime) void { } } +pub fn submitRequest(self: *Runtime, conn: *net_http.Connection) void { + self.submission_mutex.lock(); + self.submission_queue.append(&conn.node); + self.submission_mutex.unlock(); + self.wakeupPoll(); +} + +fn wakeupPoll(self: *Runtime) void { + _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; +} + +fn drainQueue(self: *Runtime) void { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + + if (self.submission_queue.first == null) return; + + const multi = self.multi orelse blk: { + const m = libcurl.curl_multi_init() orelse { + lp.assert(false, "curl multi init failed", .{}); + unreachable; + }; + self.multi = m; + break :blk m; + }; + + while (self.submission_queue.popFirst()) |node| { + const conn: *net_http.Connection = @fieldParentPtr("node", node); + conn.setPrivate(conn) catch |err| { + lp.log.err(.app, "curl set private", .{ .err = err }); + self.releaseConnection(conn); + continue; + }; + libcurl.curl_multi_add_handle(multi, conn.easy) catch |err| { + lp.log.err(.app, "curl multi add", .{ .err = err }); + self.releaseConnection(conn); + }; + } +} + pub fn stop(self: *Runtime) void { self.shutdown.store(true, .release); - _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; + self.wakeupPoll(); +} + +fn acceptConnections(self: *Runtime) void { + if (self.shutdown.load(.acquire)) { + return; + } + const listener = self.listener orelse return; + + while (true) { + const socket = posix.accept(listener.socket, null, null, posix.SOCK.NONBLOCK) catch |err| { + switch (err) { + error.WouldBlock => break, + error.SocketNotListening => { + self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; + self.listener = null; + return; + }, + error.ConnectionAborted => { + lp.log.warn(.app, "accept connection aborted", .{}); + continue; + }, + else => { + lp.log.err(.app, "accept error", .{ .err = err }); + continue; + }, + } + }; + + listener.onAccept(listener.ctx, socket); + } +} + +fn preparePollFds(self: *Runtime, multi: *libcurl.CurlM) void { + const curl_fds = self.pollfds[PSEUDO_POLLFDS..]; + @memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 }); + + var fd_count: c_uint = 0; + const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds); + libcurl.curl_multi_waitfds(multi, wait_fds, &fd_count) catch |err| { + lp.log.err(.app, "curl waitfds", .{ .err = err }); + }; +} + +fn getCurlTimeout(self: *Runtime) i32 { + const multi = self.multi orelse return -1; + var timeout_ms: c_long = -1; + libcurl.curl_multi_timeout(multi, &timeout_ms) catch return -1; + return @intCast(@min(timeout_ms, std.math.maxInt(i32))); +} + +fn processCompletions(self: *Runtime, multi: *libcurl.CurlM) void { + var msgs_in_queue: c_int = 0; + while (libcurl.curl_multi_info_read(multi, &msgs_in_queue)) |msg| { + switch (msg.data) { + .done => |maybe_err| { + if (maybe_err) |err| { + lp.log.warn(.app, "curl transfer error", .{ .err = err }); + } + }, + else => continue, + } + + const easy: *libcurl.Curl = msg.easy_handle; + var ptr: *anyopaque = undefined; + libcurl.curl_easy_getinfo(easy, .private, &ptr) catch + lp.assert(false, "curl getinfo private", .{}); + const conn: *net_http.Connection = @ptrCast(@alignCast(ptr)); + + libcurl.curl_multi_remove_handle(multi, easy) catch {}; + self.releaseConnection(conn); + } +} + +comptime { + if (@sizeOf(posix.pollfd) != @sizeOf(libcurl.CurlWaitFd)) { + @compileError("pollfd and CurlWaitFd size mismatch"); + } + if (@offsetOf(posix.pollfd, "fd") != @offsetOf(libcurl.CurlWaitFd, "fd") or + @offsetOf(posix.pollfd, "events") != @offsetOf(libcurl.CurlWaitFd, "events") or + @offsetOf(posix.pollfd, "revents") != @offsetOf(libcurl.CurlWaitFd, "revents")) + { + @compileError("pollfd and CurlWaitFd layout mismatch"); + } } pub fn getConnection(self: *Runtime) ?*net_http.Connection { diff --git a/src/network/http.zig b/src/network/http.zig index b0f70375..7916260d 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -386,11 +386,18 @@ pub const Connection = struct { } pub fn reset(self: *const Connection) !void { + try libcurl.curl_easy_setopt(self.easy, .proxy, null); + try libcurl.curl_easy_setopt(self.easy, .http_header, null); + try libcurl.curl_easy_setopt(self.easy, .header_data, null); try libcurl.curl_easy_setopt(self.easy, .header_function, null); + try libcurl.curl_easy_setopt(self.easy, .write_data, null); - try libcurl.curl_easy_setopt(self.easy, .write_function, null); - try libcurl.curl_easy_setopt(self.easy, .proxy, null); + try libcurl.curl_easy_setopt(self.easy, .write_function, discardBody); + } + + fn discardBody(_: [*]const u8, count: usize, len: usize, _: ?*anyopaque) usize { + return count * len; } pub fn setProxy(self: *const Connection, proxy: ?[:0]const u8) !void { diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index b93ecc77..1c6f9f13 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -629,10 +629,13 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .write_function => blk: { const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { .null => null, - .@"fn" => struct { + .@"fn" => |info| struct { fn cb(buffer: [*c]u8, count: usize, len: usize, user: ?*anyopaque) callconv(.c) usize { - const u = user orelse unreachable; - return value(@ptrCast(buffer), count, len, u); + const user_arg = if (@typeInfo(info.params[3].type.?) == .optional) + user + else + user orelse unreachable; + return value(@ptrCast(buffer), count, len, user_arg); } }.cb, else => @compileError("expected Zig function or null for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))), @@ -753,6 +756,15 @@ pub fn curl_multi_poll( try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds)); } +pub fn curl_multi_waitfds(multi: *CurlM, ufds: []CurlWaitFd, fd_count: *c_uint) ErrorMulti!void { + const raw_fds: [*c]c.curl_waitfd = if (ufds.len == 0) null else @ptrCast(ufds.ptr); + try errorMCheck(c.curl_multi_waitfds(multi, raw_fds, @intCast(ufds.len), fd_count)); +} + +pub fn curl_multi_timeout(multi: *CurlM, timeout_ms: *c_long) ErrorMulti!void { + try errorMCheck(c.curl_multi_timeout(multi, timeout_ms)); +} + pub fn curl_multi_info_read(multi: *CurlM, msgs_in_queue: *c_int) ?CurlMsg { const ptr = c.curl_multi_info_read(multi, msgs_in_queue); if (ptr == null) return null; diff --git a/src/telemetry/lightpanda.zig b/src/telemetry/lightpanda.zig index 62ac2f99..96587256 100644 --- a/src/telemetry/lightpanda.zig +++ b/src/telemetry/lightpanda.zig @@ -2,140 +2,132 @@ const std = @import("std"); const builtin = @import("builtin"); const build_config = @import("build_config"); -const Thread = std.Thread; const Allocator = std.mem.Allocator; const log = @import("../log.zig"); const App = @import("../App.zig"); const Config = @import("../Config.zig"); const telemetry = @import("telemetry.zig"); +const Runtime = @import("../network/Runtime.zig"); const Connection = @import("../network/http.zig").Connection; const URL = "https://telemetry.lightpanda.io"; -const MAX_BATCH_SIZE = 20; +const BUFFER_SIZE = 1024; +const MAX_BODY_SIZE = 500 * 1024; // 500KB server limit -pub const LightPanda = struct { - running: bool, - thread: ?std.Thread, - allocator: Allocator, - mutex: std.Thread.Mutex, - cond: Thread.Condition, - connection: Connection, - config: *const Config, - pending: std.DoublyLinkedList, - mem_pool: std.heap.MemoryPool(LightPandaEvent), +const LightPanda = @This(); - pub fn init(app: *App) !LightPanda { - const connection = try app.network.newConnection(); - errdefer connection.deinit(); +allocator: Allocator, +runtime: *Runtime, +writer: std.Io.Writer.Allocating, - try connection.setURL(URL); - try connection.setMethod(.POST); +/// Protects concurrent producers in send(). +mutex: std.Thread.Mutex = .{}, - const allocator = app.allocator; - return .{ - .cond = .{}, - .mutex = .{}, - .pending = .{}, - .thread = null, - .running = true, - .allocator = allocator, - .connection = connection, - .config = app.config, - .mem_pool = std.heap.MemoryPool(LightPandaEvent).init(allocator), - }; +iid: ?[36]u8 = null, +run_mode: Config.RunMode = .serve, + +head: std.atomic.Value(usize) = .init(0), +tail: std.atomic.Value(usize) = .init(0), +dropped: std.atomic.Value(usize) = .init(0), +buffer: [BUFFER_SIZE]telemetry.Event = undefined, + +pub fn init(self: *LightPanda, app: *App, iid: ?[36]u8, run_mode: Config.RunMode) !void { + self.* = .{ + .iid = iid, + .run_mode = run_mode, + .allocator = app.allocator, + .runtime = &app.network, + .writer = std.Io.Writer.Allocating.init(app.allocator), + }; + + self.runtime.onTick(@ptrCast(self), flushCallback); +} + +pub fn deinit(self: *LightPanda) void { + self.writer.deinit(); +} + +pub fn send(self: *LightPanda, raw_event: telemetry.Event) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + const t = self.tail.load(.monotonic); + const h = self.head.load(.acquire); + if (t - h >= BUFFER_SIZE) { + _ = self.dropped.fetchAdd(1, .monotonic); + return; } - pub fn deinit(self: *LightPanda) void { - if (self.thread) |*thread| { - self.mutex.lock(); - self.running = false; - self.mutex.unlock(); - self.cond.signal(); - thread.join(); - } - self.mem_pool.deinit(); - self.connection.deinit(); + self.buffer[t % BUFFER_SIZE] = raw_event; + self.tail.store(t + 1, .release); +} + +fn flushCallback(ctx: *anyopaque) void { + const self: *LightPanda = @ptrCast(@alignCast(ctx)); + self.postEvent() catch |err| { + log.warn(.telemetry, "flush error", .{ .err = err }); + }; +} + +fn postEvent(self: *LightPanda) !void { + const conn = self.runtime.getConnection() orelse { + return; + }; + errdefer self.runtime.releaseConnection(conn); + + const h = self.head.load(.monotonic); + const t = self.tail.load(.acquire); + const dropped = self.dropped.swap(0, .monotonic); + + if (h == t and dropped == 0) { + self.runtime.releaseConnection(conn); + return; + } + errdefer _ = self.dropped.fetchAdd(dropped, .monotonic); + + self.writer.clearRetainingCapacity(); + + if (dropped > 0) { + _ = try self.writeEvent(.{ .buffer_overflow = .{ .dropped = dropped } }); } - pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: Config.RunMode, raw_event: telemetry.Event) !void { - const event = try self.mem_pool.create(); - event.* = .{ - .iid = iid, - .mode = run_mode, - .event = raw_event, - .node = .{}, - }; + var sent: usize = 0; + for (h..t) |i| { + const fit = try self.writeEvent(self.buffer[i % BUFFER_SIZE]); + if (!fit) break; - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.thread == null) { - self.thread = try std.Thread.spawn(.{}, run, .{self}); - } - - self.pending.append(&event.node); - self.cond.signal(); + sent += 1; } - fn run(self: *LightPanda) void { - var aw = std.Io.Writer.Allocating.init(self.allocator); - defer aw.deinit(); + try conn.setURL(URL); + try conn.setMethod(.POST); + try conn.setBody(self.writer.written()); - var batch: [MAX_BATCH_SIZE]*LightPandaEvent = undefined; - self.mutex.lock(); - while (true) { - while (self.pending.first != null) { - const b = self.collectBatch(&batch); - self.mutex.unlock(); - self.postEvent(b, &aw) catch |err| { - log.warn(.telemetry, "post error", .{ .err = err }); - }; - self.mutex.lock(); - } - if (self.running == false) { - return; - } - self.cond.wait(&self.mutex); - } + self.head.store(h + sent, .release); + self.runtime.submitRequest(conn); +} + +fn writeEvent(self: *LightPanda, event: telemetry.Event) !bool { + const iid: ?[]const u8 = if (self.iid) |*id| id else null; + const wrapped = LightPandaEvent{ .iid = iid, .mode = self.run_mode, .event = event }; + + const checkpoint = self.writer.written().len; + + try std.json.Stringify.value(&wrapped, .{ .emit_null_optional_fields = false }, &self.writer.writer); + try self.writer.writer.writeByte('\n'); + + if (self.writer.written().len > MAX_BODY_SIZE) { + self.writer.shrinkRetainingCapacity(checkpoint); + return false; } - - fn postEvent(self: *LightPanda, events: []*LightPandaEvent, aw: *std.Io.Writer.Allocating) !void { - defer for (events) |e| { - self.mem_pool.destroy(e); - }; - - defer aw.clearRetainingCapacity(); - for (events) |event| { - try std.json.Stringify.value(event, .{ .emit_null_optional_fields = false }, &aw.writer); - try aw.writer.writeByte('\n'); - } - - try self.connection.setBody(aw.written()); - const status = try self.connection.request(&self.config.http_headers); - - if (status != 200) { - log.warn(.telemetry, "server error", .{ .status = status }); - } - } - - fn collectBatch(self: *LightPanda, into: []*LightPandaEvent) []*LightPandaEvent { - var i: usize = 0; - while (self.pending.popFirst()) |node| { - into[i] = @fieldParentPtr("node", node); - i += 1; - if (i == MAX_BATCH_SIZE) { - break; - } - } - return into[0..i]; - } -}; + return true; +} const LightPandaEvent = struct { iid: ?[]const u8, mode: Config.RunMode, event: telemetry.Event, - node: std.DoublyLinkedList.Node, pub fn jsonStringify(self: *const LightPandaEvent, writer: anytype) !void { try writer.beginObject(); diff --git a/src/telemetry/telemetry.zig b/src/telemetry/telemetry.zig index dfdc0200..e79003f3 100644 --- a/src/telemetry/telemetry.zig +++ b/src/telemetry/telemetry.zig @@ -11,26 +11,21 @@ const uuidv4 = @import("../id.zig").uuidv4; const IID_FILE = "iid"; pub fn isDisabled() bool { + if (builtin.mode == .Debug or builtin.is_test) { + return true; + } + return std.process.hasEnvVarConstant("LIGHTPANDA_DISABLE_TELEMETRY"); } -pub const Telemetry = TelemetryT(blk: { - if (builtin.mode == .Debug or builtin.is_test) break :blk NoopProvider; - break :blk @import("lightpanda.zig").LightPanda; -}); +pub const Telemetry = TelemetryT(@import("lightpanda.zig")); fn TelemetryT(comptime P: type) type { return struct { - // an "install" id that we [try to] persist and re-use between runs - // null on IO error - iid: ?[36]u8, - - provider: P, + provider: *P, disabled: bool, - run_mode: Config.RunMode, - const Self = @This(); pub fn init(app: *App, run_mode: Config.RunMode) !Self { @@ -39,27 +34,29 @@ fn TelemetryT(comptime P: type) type { log.info(.telemetry, "telemetry status", .{ .disabled = disabled }); } - const provider = try P.init(app); - errdefer provider.deinit(); + const iid: ?[36]u8 = if (disabled) null else getOrCreateId(app.app_dir_path); + + const provider = try app.allocator.create(P); + errdefer app.allocator.destroy(provider); + + try P.init(provider, app, iid, run_mode); return .{ .disabled = disabled, - .run_mode = run_mode, .provider = provider, - .iid = if (disabled) null else getOrCreateId(app.app_dir_path), }; } - pub fn deinit(self: *Self) void { + pub fn deinit(self: *Self, allocator: Allocator) void { self.provider.deinit(); + allocator.destroy(self.provider); } pub fn record(self: *Self, event: Event) void { if (self.disabled) { return; } - const iid: ?[]const u8 = if (self.iid) |*iid| iid else null; - self.provider.send(iid, self.run_mode, event) catch |err| { + self.provider.send(event) catch |err| { log.warn(.telemetry, "record error", .{ .err = err, .type = @tagName(std.meta.activeTag(event)) }); }; } @@ -105,6 +102,7 @@ fn getOrCreateId(app_dir_path_: ?[]const u8) ?[36]u8 { pub const Event = union(enum) { run: void, navigate: Navigate, + buffer_overflow: BufferOverflow, flag: []const u8, // used for testing const Navigate = struct { @@ -112,36 +110,35 @@ pub const Event = union(enum) { proxy: bool, driver: []const u8 = "cdp", }; -}; -const NoopProvider = struct { - fn init(_: *App) !NoopProvider { - return .{}; - } - fn deinit(_: NoopProvider) void {} - pub fn send(_: NoopProvider, _: ?[]const u8, _: Config.RunMode, _: Event) !void {} + const BufferOverflow = struct { + dropped: usize, + }; }; extern fn setenv(name: [*:0]u8, value: [*:0]u8, override: c_int) c_int; extern fn unsetenv(name: [*:0]u8) c_int; const testing = @import("../testing.zig"); -test "telemetry: disabled by environment" { +test "telemetry: always disabled in debug builds" { + // Must be disabled regardless of environment variable. + _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); + try testing.expectEqual(true, isDisabled()); + _ = setenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY"), @constCast(""), 0); defer _ = unsetenv(@constCast("LIGHTPANDA_DISABLE_TELEMETRY")); + try testing.expectEqual(true, isDisabled()); const FailingProvider = struct { - fn init(_: *App) !@This() { - return .{}; - } - fn deinit(_: @This()) void {} - pub fn send(_: @This(), _: ?[]const u8, _: Config.RunMode, _: Event) !void { + fn init(_: *@This(), _: *App, _: ?[36]u8, _: Config.RunMode) !void {} + fn deinit(_: *@This()) void {} + pub fn send(_: *@This(), _: Event) !void { unreachable; } }; - var telemetry = try TelemetryT(FailingProvider).init(undefined, .serve); - defer telemetry.deinit(); + var telemetry = try TelemetryT(FailingProvider).init(testing.test_app, .serve); + defer telemetry.deinit(testing.test_app.allocator); telemetry.record(.{ .run = {} }); } @@ -165,8 +162,9 @@ test "telemetry: getOrCreateId" { test "telemetry: sends event to provider" { var telemetry = try TelemetryT(MockProvider).init(testing.test_app, .serve); - defer telemetry.deinit(); - const mock = &telemetry.provider; + defer telemetry.deinit(testing.test_app.allocator); + telemetry.disabled = false; + const mock = telemetry.provider; telemetry.record(.{ .flag = "1" }); telemetry.record(.{ .flag = "2" }); @@ -179,15 +177,11 @@ test "telemetry: sends event to provider" { } const MockProvider = struct { - iid: ?[]const u8, - run_mode: ?Config.RunMode, allocator: Allocator, events: std.ArrayList(Event), - fn init(app: *App) !@This() { - return .{ - .iid = null, - .run_mode = null, + fn init(self: *MockProvider, app: *App, _: ?[36]u8, _: Config.RunMode) !void { + self.* = .{ .events = .{}, .allocator = app.allocator, }; @@ -195,15 +189,7 @@ const MockProvider = struct { fn deinit(self: *MockProvider) void { self.events.deinit(self.allocator); } - pub fn send(self: *MockProvider, iid: ?[]const u8, run_mode: Config.RunMode, events: Event) !void { - if (self.iid == null) { - try testing.expectEqual(null, self.run_mode); - self.iid = iid.?; - self.run_mode = run_mode; - } else { - try testing.expectEqual(self.iid.?, iid.?); - try testing.expectEqual(self.run_mode.?, run_mode); - } - try self.events.append(self.allocator, events); + pub fn send(self: *MockProvider, event: Event) !void { + try self.events.append(self.allocator, event); } };