Used ring buffer for telemetry events buffer

This commit is contained in:
Nikolay Govorov
2026-03-16 23:22:25 +00:00
parent b14ae02548
commit 5fb561dc9c
4 changed files with 155 additions and 106 deletions

View File

@@ -41,6 +41,8 @@ const Listener = struct {
// Number of fixed pollfds entries (wakeup pipe + listener).
const PSEUDO_POLLFDS = 2;
const MAX_TICK_CALLBACKS = 16;
allocator: Allocator,
config: *const Config,
@@ -67,6 +69,15 @@ multi: ?*libcurl.CurlM = null,
submission_mutex: std.Thread.Mutex = .{},
submission_queue: std.DoublyLinkedList = .{},
callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined,
callbacks_len: usize = 0,
callbacks_mutex: std.Thread.Mutex = .{},
const TickCallback = struct {
ctx: *anyopaque,
fun: *const fn (*anyopaque) void,
};
const ZigToCurlAllocator = struct {
// C11 requires malloc to return memory aligned to max_align_t (16 bytes on x86_64).
// We match this guarantee since libcurl expects malloc-compatible alignment.
@@ -302,6 +313,30 @@ pub fn bind(
};
}
pub fn onTick(self: *Runtime, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void {
self.callbacks_mutex.lock();
defer self.callbacks_mutex.unlock();
lp.assert(self.callbacks_len < MAX_TICK_CALLBACKS, "too many ticks", .{});
self.callbacks[self.callbacks_len] = .{
.ctx = ctx,
.fun = callback,
};
self.callbacks_len += 1;
self.wakeupPoll();
}
pub fn fireTicks(self: *Runtime) void {
self.callbacks_mutex.lock();
defer self.callbacks_mutex.unlock();
for (self.callbacks[0..self.callbacks_len]) |*callback| {
callback.fun(callback.ctx);
}
}
pub fn run(self: *Runtime) void {
var drain_buf: [64]u8 = undefined;
var running_handles: c_int = 0;
@@ -326,7 +361,20 @@ pub fn run(self: *Runtime) void {
self.preparePollFds(multi);
}
const timeout = if (self.multi != null) self.getCurlTimeout() else @as(i32, -1);
// for ontick to work, you need to wake up periodically
const timeout = blk: {
const min_timeout = 250; // 250ms
if (self.multi == null) {
break :blk min_timeout;
}
const curl_timeout = self.getCurlTimeout();
if (curl_timeout == 0) {
break :blk min_timeout;
}
break :blk @min(min_timeout, curl_timeout);
};
_ = posix.poll(self.pollfds, timeout) catch |err| {
lp.log.err(.app, "poll", .{ .err = err });
@@ -354,8 +402,16 @@ pub fn run(self: *Runtime) void {
self.processCompletions(multi);
}
if (self.shutdown.load(.acquire) and running_handles == 0)
break;
self.fireTicks();
if (self.shutdown.load(.acquire) and running_handles == 0) {
// Check if fireTicks submitted new requests (e.g. telemetry flush).
// If so, continue the loop to drain and send them before exiting.
self.submission_mutex.lock();
const has_pending = self.submission_queue.first != null;
self.submission_mutex.unlock();
if (!has_pending) break;
}
}
if (self.listener) |listener| {