implement finalizers

Finalizer allow tasks to cleanup when scheduler deinitialized.
This commit is contained in:
Halil Durak
2026-02-03 15:08:45 +03:00
parent a4595f58b8
commit 03018c28b4

View File

@@ -43,59 +43,127 @@ pub fn init(allocator: std.mem.Allocator) Scheduler {
} }
pub fn deinit(self: *Scheduler) void { pub fn deinit(self: *Scheduler) void {
self.task_pool.deinit(); // Release memory pool after finalization.
defer self.task_pool.deinit();
// Finalize all low-prio tasks.
while (self.low_priority.deleteMin()) |task| {
if (task.on_finalize) |on_finalize| {
@call(.auto, on_finalize, .{ task, self });
} }
}
// Finalize all high-prio tasks.
while (self.high_priority.deleteMin()) |task| {
if (task.on_finalize) |on_finalize| {
@call(.auto, on_finalize, .{ task, self });
}
}
}
/// Scheduled tasks must satisfy this interface.
const ScheduleInterface = struct {
const T = type;
/// Must be implemented if scheduled with `Scheduler.once`.
fn onRun(ctx: *T, scheduler: *Scheduler) !void {
_ = ctx;
_ = scheduler;
}
/// Must be implemented if scheduled with `Scheduler.after`.
fn onReady(ctx: *T, scheduler: *Scheduler) !AfterAction {
_ = ctx;
_ = scheduler;
}
/// Optional, should be implemented if task needs to do some cleanup.
fn onFinalize(ctx: *T) void {
_ = ctx;
}
};
pub const Priority = enum(u1) { low, high }; pub const Priority = enum(u1) { low, high };
pub const ScheduleOptions = struct { pub const ScheduleOptions = struct {
name: []const u8 = "unspecified", name: []const u8 = "unspecified",
priority: Priority = .high, prio: Priority,
// TODO: Backport finalizer.
}; };
/// Schedules a task; runs it once. // scheduler.once(
// .{ .name = "my-event", .priority = .high },
// MyType,
// &my_type,
// struct {
// fn action(my_type: *MyType, scheduler: *Scheduler) !void {
// // action taken logic...
// }
//
// fn finalize(my_type: *MyType) void {
// // finalize logic...
// }
// },
// );
/// Schedules a task that'll be executed in the next run.
pub fn once( pub fn once(
self: *Scheduler, self: *Scheduler,
comptime options: ScheduleOptions, comptime options: ScheduleOptions,
/// Type of `ctx`. /// Type of `ctx`.
comptime T: type, comptime T: type,
ctx: *T, ctx: *T,
comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!void, /// See `Scheduler.ScheduleInterface` (private type) for reference.
comptime Interface: anytype,
) !void { ) !void {
if (comptime IS_DEBUG) { if (comptime IS_DEBUG) {
log.debug(.scheduler, "Scheduler.once", .{ .name = options.name, .priority = @tagName(options.priority) }); log.debug(.scheduler, "Scheduler.once", .{ .name = options.name, .prio = @tagName(options.prio) });
} }
const runner = struct { const action_runner = struct {
fn runner(task: *Task, scheduler: *Scheduler) void { fn runner(task: *Task, scheduler: *Scheduler) void {
// Return task back to pool once done.
defer scheduler.task_pool.destroy(task);
// Give type-erased ctx a type. // Give type-erased ctx a type.
const typed_ctx: *T = @ptrCast(@alignCast(task.ctx)); const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
// Run provided callback; this can fail, we'll ignore it though. // Run provided callback; this can fail, we won't handle it though.
@call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| { @call(.auto, Interface.action, .{ scheduler, typed_ctx }) catch |err| {
log.warn(.scheduler, "Task.callback", .{ log.warn(.scheduler, "Task.action", .{
.name = options.name, .name = options.name,
.priority = @tagName(options.priority), .priority = @tagName(options.prio),
.err = err, .err = err,
}); });
}; };
// Return task back to pool.
scheduler.task_pool.destroy(task);
} }
}.runner; }.runner;
// Finalizer, if provided.
const finalize_runner = blk: {
if (!std.meta.hasFn(Interface, "finalize")) {
break :blk null;
}
break :blk (struct {
fn runner(task: *Task, scheduler: *Scheduler) void {
defer scheduler.task_pool.destroy(task);
const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
@call(.always_inline, Interface.finalize, .{typed_ctx});
}
}).runner;
};
const task = try self.task_pool.create(); const task = try self.task_pool.create();
task.* = .{ task.* = .{
// This variant always have 0-timeout. // This variant always have 0-timeout.
.run_at = milliTimestamp(.monotonic), .run_at = milliTimestamp(.monotonic),
.ctx = ctx, .ctx = ctx,
.callback = runner, .on_action = action_runner,
.on_finalize = finalize_runner,
}; };
// Push to right heap. // Push to right heap.
switch (options.priority) { switch (options.prio) {
.low => self.low_priority.insert(task), .low => self.low_priority.insert(task),
.high => self.high_priority.insert(task), .high => self.high_priority.insert(task),
} }
@@ -129,34 +197,35 @@ pub fn after(
comptime T: type, comptime T: type,
ctx: *T, ctx: *T,
run_in_ms: u32, run_in_ms: u32,
/// If an integer is returned, the task will be repeated after that much ms. /// See `Scheduler.ScheduleInterface` (private type) for reference.
/// If null is returned, task won't be repeated. comptime Interface: anytype,
comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!AfterAction, // If an integer is returned, the task will be repeated after that much ms.
// If null is returned, task won't be repeated.
//comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!AfterAction,
) !void { ) !void {
if (comptime IS_DEBUG) { if (comptime IS_DEBUG) {
log.debug(.scheduler, "Scheduler.after", .{ log.debug(.scheduler, "Scheduler.after", .{
.name = options.name, .name = options.name,
.run_in_ms = run_in_ms, .run_in_ms = run_in_ms,
.priority = @tagName(options.priority), .priority = @tagName(options.prio),
}); });
} }
const runner = struct { const action_runner = struct {
fn runner(task: *Task, scheduler: *Scheduler) void { fn runner(task: *Task, scheduler: *Scheduler) void {
// Give type-erased ctx a type. // Give type-erased ctx a type.
const typed_ctx: *T = @ptrCast(@alignCast(task.ctx)); const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
// Run provided callback; this can fail, we'll ignore it though. // Run provided callback; this can fail, we won't handle it though.
const result = @call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| { const result = @call(.auto, Interface.action, .{ scheduler, typed_ctx }) catch |err| blk: {
log.warn(.scheduler, "Task.callback", .{ log.warn(.scheduler, "Task.action", .{
.name = options.name, .name = options.name,
.priority = @tagName(options.priority), .priority = @tagName(options.prio),
.err = err, .err = err,
}); });
// Can't repeat w/o return value. // Can't repeat w/o return value.
scheduler.task_pool.destroy(task); break :blk AfterAction.dont_repeat;
return;
}; };
// If task is not repeated, disarm. // If task is not repeated, disarm.
@@ -168,22 +237,39 @@ pub fn after(
// Wants recur. // Wants recur.
const repeat_in_ms = result.ms; const repeat_in_ms = result.ms;
// Task cannot be repeated immediately, and they should know that. // Task cannot be repeated immediately, and they should know that.
lp.assert(repeat_in_ms != 0, "Task.callback: 0-timer", .{ .name = options.name }); lp.assert(repeat_in_ms != 0, "Task.action: 0-timer", .{ .name = options.name });
task.run_at = milliTimestamp(.monotonic) + repeat_in_ms; task.run_at = milliTimestamp(.monotonic) + repeat_in_ms;
// Prefer low priority? // Prefer low priority?
scheduler.low_priority.insert(task); scheduler.low_priority.insert(task);
} }
}.runner; }.runner;
// Finalizer, if provided.
const finalize_runner = blk: {
if (!std.meta.hasFn(Interface, "finalize")) {
break :blk null;
}
break :blk (struct {
fn runner(task: *Task, scheduler: *Scheduler) void {
defer scheduler.task_pool.destroy(task);
const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
@call(.always_inline, Interface.finalize, .{typed_ctx});
}
}).runner;
};
const task = try self.task_pool.create(); const task = try self.task_pool.create();
task.* = .{ task.* = .{
.run_at = milliTimestamp(.monotonic) + run_in_ms, .run_at = milliTimestamp(.monotonic) + run_in_ms,
.ctx = ctx, .ctx = ctx,
.callback = runner, .on_action = action_runner,
.on_finalize = finalize_runner,
}; };
// Push to right heap. // Push to right heap.
switch (options.priority) { switch (options.prio) {
.low => self.low_priority.insert(task), .low => self.low_priority.insert(task),
.high => self.high_priority.insert(task), .high => self.high_priority.insert(task),
} }
@@ -196,14 +282,8 @@ pub fn run(self: *Scheduler) !?u64 {
/// Runs events of the desired tree. /// Runs events of the desired tree.
fn runTasks(self: *Scheduler, comptime prio: Priority) if (prio == .low) void else ?u64 { fn runTasks(self: *Scheduler, comptime prio: Priority) if (prio == .low) void else ?u64 {
const tree = if (comptime prio == .low) &self.low_priority else &self.high_priority;
// const cached_time = self.cached_time;
// const now = milliTimestamp(.monotonic);
// Update cache...
// self.cached_time = now;
const now = milliTimestamp(.monotonic); const now = milliTimestamp(.monotonic);
const tree = if (comptime prio == .low) &self.low_priority else &self.high_priority;
while (tree.peek()) |task| { while (tree.peek()) |task| {
// No tasks to execute so far. // No tasks to execute so far.
@@ -223,7 +303,7 @@ fn runTasks(self: *Scheduler, comptime prio: Priority) if (prio == .low) void el
log.debug(.scheduler, "Scheduler.runTasks", .{ .prio = @tagName(prio) }); log.debug(.scheduler, "Scheduler.runTasks", .{ .prio = @tagName(prio) });
} }
task.callback(task, self); @call(.auto, task.on_action, .{ task, self });
} }
if (comptime prio == .high) { if (comptime prio == .high) {
@@ -237,7 +317,9 @@ const Task = struct {
run_at: u64, run_at: u64,
/// Userdata. /// Userdata.
ctx: *anyopaque, ctx: *anyopaque,
callback: *const fn (task: *Task, scheduler: *Scheduler) void, on_action: *const fn (task: *Task, scheduler: *Scheduler) void,
on_finalize: ?*const fn (task: *Task, scheduler: *Scheduler) void = null,
/// Entry in `Heap`.
heap: heap.IntrusiveField(Task) = .{}, heap: heap.IntrusiveField(Task) = .{},
const Heap = heap.Intrusive(Task, void, Task.less); const Heap = heap.Intrusive(Task, void, Task.less);