rework Scheduler

This commit is contained in:
Halil Durak
2026-02-02 13:32:28 +03:00
parent 24f2cb7cfc
commit b02c0f3656

View File

@@ -19,116 +19,231 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const lp = @import("lightpanda");
const Page = @import("Page.zig");
const log = @import("../log.zig"); const log = @import("../log.zig");
const milliTimestamp = @import("../datetime.zig").milliTimestamp; const milliTimestamp = @import("../datetime.zig").milliTimestamp;
const heap = @import("../heap.zig");
const IS_DEBUG = builtin.mode == .Debug; const IS_DEBUG = builtin.mode == .Debug;
const Queue = std.PriorityQueue(Task, void, struct {
fn compare(_: void, a: Task, b: Task) std.math.Order {
const time_order = std.math.order(a.run_at, b.run_at);
if (time_order != .eq) return time_order;
// Break ties with sequence number to maintain FIFO order
return std.math.order(a.sequence, b.sequence);
}
}.compare);
const Scheduler = @This(); const Scheduler = @This();
high_priority: Task.Heap,
_sequence: u64, low_priority: Task.Heap,
low_priority: Queue, task_pool: std.heap.MemoryPool(Task),
high_priority: Queue,
pub fn init(allocator: std.mem.Allocator) Scheduler { pub fn init(allocator: std.mem.Allocator) Scheduler {
return .{ return .{
._sequence = 0, .task_pool = .init(allocator),
.low_priority = Queue.init(allocator, {}), .high_priority = .{ .context = {} },
.high_priority = Queue.init(allocator, {}), .low_priority = .{ .context = {} },
}; };
} }
pub fn deinit(self: *Scheduler) void { pub fn deinit(self: *Scheduler) void {
finalizeTasks(&self.low_priority); self.task_pool.deinit();
finalizeTasks(&self.high_priority);
} }
const AddOpts = struct { pub const Priority = enum(u1) { low, high };
name: []const u8 = "",
low_priority: bool = false, pub const ScheduleOptions = struct {
finalizer: ?Finalizer = null, name: []const u8 = "unspecified",
priority: Priority = .high,
// TODO: Backport finalizer.
}; };
pub fn add(self: *Scheduler, ctx: *anyopaque, cb: Callback, run_in_ms: u32, opts: AddOpts) !void {
/// Schedules a task; runs it once.
pub fn once(
self: *Scheduler,
comptime options: ScheduleOptions,
/// Type of `ctx`.
comptime T: type,
ctx: *T,
comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!void,
) !void {
if (comptime IS_DEBUG) { if (comptime IS_DEBUG) {
log.debug(.scheduler, "scheduler.add", .{ .name = opts.name, .run_in_ms = run_in_ms, .low_priority = opts.low_priority }); log.debug(.scheduler, "Scheduler.once", .{ .name = options.name, .priority = @tagName(options.priority) });
} }
var queue = if (opts.low_priority) &self.low_priority else &self.high_priority;
const seq = self._sequence + 1; const runner = struct {
self._sequence = seq; fn runner(task: *Task, scheduler: *Scheduler) void {
return queue.add(.{ // Give type-erased ctx a type.
.ctx = ctx, const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
.callback = cb,
.sequence = seq, // Run provided callback; this can fail, we'll ignore it though.
.name = opts.name, @call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| {
.finalizer = opts.finalizer, log.warn(.scheduler, "Task.callback", .{
.run_at = milliTimestamp(.monotonic) + run_in_ms, .name = options.name,
.priority = @tagName(options.priority),
.err = err,
}); });
};
// Return task back to pool.
scheduler.task_pool.destroy(task);
}
}.runner;
const task = try self.task_pool.create();
task.* = .{
// This variant always have 0-timeout.
.run_at = milliTimestamp(.monotonic),
.ctx = ctx,
.callback = runner,
};
// Push to right heap.
switch (options.priority) {
.low => self.low_priority.insert(task),
.high => self.high_priority.insert(task),
}
}
/// Action to be taken `after` callback being run.
/// Don't manually create this, prefer such syntax:
///
/// ```zig
/// .repeat(150); // Repeat after 150ms.
/// .dont_repeat; // Don't repeat.
/// ```
pub const AfterAction = packed struct(u32) {
/// Whether rerun.
recur: bool,
/// Largest repeat `setInterval` can have is 2147483647ms.
ms: u31,
pub const dont_repeat = AfterAction{ .recur = false, .ms = 0 };
pub inline fn repeat(ms: u31) AfterAction {
return .{ .recur = true, .ms = ms };
}
};
/// Schedules a task that'll be run after given time in ms.
pub fn after(
self: *Scheduler,
comptime options: ScheduleOptions,
/// Type of `ctx`.
comptime T: type,
ctx: *T,
run_in_ms: u32,
/// If an integer is returned, the task will be repeated after that much ms.
/// If null is returned, task won't be repeated.
comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!AfterAction,
) !void {
if (comptime IS_DEBUG) {
log.debug(.scheduler, "Scheduler.after", .{
.name = options.name,
.run_in_ms = run_in_ms,
.priority = @tagName(options.priority),
});
}
const runner = struct {
fn runner(task: *Task, scheduler: *Scheduler) void {
// Give type-erased ctx a type.
const typed_ctx: *T = @ptrCast(@alignCast(task.ctx));
// Run provided callback; this can fail, we'll ignore it though.
const result = @call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| {
log.warn(.scheduler, "Task.callback", .{
.name = options.name,
.priority = @tagName(options.priority),
.err = err,
});
// Can't repeat w/o return value.
scheduler.task_pool.destroy(task);
return;
};
// If task is not repeated, disarm.
if (!result.recur) {
scheduler.task_pool.destroy(task);
return;
}
// Wants recur.
const repeat_in_ms = result.ms;
// Task cannot be repeated immediately, and they should know that.
lp.assert(repeat_in_ms != 0, "Task.callback: 0-timer", .{ .name = options.name });
task.run_at = milliTimestamp(.monotonic) + repeat_in_ms;
// Prefer low priority?
scheduler.low_priority.insert(task);
}
}.runner;
const task = try self.task_pool.create();
task.* = .{
.run_at = milliTimestamp(.monotonic) + run_in_ms,
.ctx = ctx,
.callback = runner,
};
// Push to right heap.
switch (options.priority) {
.low => self.low_priority.insert(task),
.high => self.high_priority.insert(task),
}
} }
pub fn run(self: *Scheduler) !?u64 { pub fn run(self: *Scheduler) !?u64 {
_ = try self.runQueue(&self.low_priority); self.runTasks(.low);
return self.runQueue(&self.high_priority); return self.runTasks(.high);
} }
fn runQueue(self: *Scheduler, queue: *Queue) !?u64 { /// Runs events of the desired tree.
if (queue.count() == 0) { fn runTasks(self: *Scheduler, comptime prio: Priority) if (prio == .low) void else ?u64 {
return null; const tree = if (comptime prio == .low) &self.low_priority else &self.high_priority;
}
// const cached_time = self.cached_time;
// const now = milliTimestamp(.monotonic);
// Update cache...
// self.cached_time = now;
const now = milliTimestamp(.monotonic); const now = milliTimestamp(.monotonic);
while (queue.peek()) |*task_| { while (tree.peek()) |task| {
if (task_.run_at > now) { // No tasks to execute so far.
return @intCast(task_.run_at - now); if (task.run_at > now) {
} if (comptime prio == .low) {
var task = queue.remove(); break;
if (comptime IS_DEBUG) {
log.debug(.scheduler, "scheduler.runTask", .{ .name = task.name });
} }
const repeat_in_ms = task.callback(task.ctx) catch |err| { return task.run_at - now;
log.warn(.scheduler, "task.callback", .{ .name = task.name, .err = err }); }
continue;
}; // Remove from the heap.
const min = tree.deleteMin();
lp.assert(min.? == task, "Scheduler.runTasks: unexpected", .{});
if (repeat_in_ms) |ms| {
// Task cannot be repeated immediately, and they should know that
if (comptime IS_DEBUG) { if (comptime IS_DEBUG) {
std.debug.assert(ms != 0); log.debug(.scheduler, "Scheduler.runTasks", .{ .prio = @tagName(prio) });
}
task.run_at = now + ms;
try self.low_priority.add(task);
} }
task.callback(task, self);
} }
if (comptime prio == .high) {
return null; return null;
}
fn finalizeTasks(queue: *Queue) void {
var it = queue.iterator();
while (it.next()) |t| {
if (t.finalizer) |func| {
func(t.ctx);
}
} }
} }
/// Internal task representation.
const Task = struct { const Task = struct {
/// When to execute this task.
run_at: u64, run_at: u64,
sequence: u64, /// Userdata.
ctx: *anyopaque, ctx: *anyopaque,
name: []const u8, callback: *const fn (task: *Task, scheduler: *Scheduler) void,
callback: Callback, heap: heap.IntrusiveField(Task) = .{},
finalizer: ?Finalizer,
};
const Callback = *const fn (ctx: *anyopaque) anyerror!?u32; const Heap = heap.Intrusive(Task, void, Task.less);
const Finalizer = *const fn (ctx: *anyopaque) void;
/// Compare 2 tasks by their execution time.
fn less(_: void, a: *const Task, b: *const Task) bool {
return a.run_at < b.run_at;
}
};