diff --git a/src/browser/Scheduler.zig b/src/browser/Scheduler.zig index 1bd3e60a..63632fc4 100644 --- a/src/browser/Scheduler.zig +++ b/src/browser/Scheduler.zig @@ -19,116 +19,231 @@ const std = @import("std"); const builtin = @import("builtin"); +const lp = @import("lightpanda"); + +const Page = @import("Page.zig"); + const log = @import("../log.zig"); const milliTimestamp = @import("../datetime.zig").milliTimestamp; +const heap = @import("../heap.zig"); const IS_DEBUG = builtin.mode == .Debug; -const Queue = std.PriorityQueue(Task, void, struct { - fn compare(_: void, a: Task, b: Task) std.math.Order { - const time_order = std.math.order(a.run_at, b.run_at); - if (time_order != .eq) return time_order; - // Break ties with sequence number to maintain FIFO order - return std.math.order(a.sequence, b.sequence); - } -}.compare); - const Scheduler = @This(); - -_sequence: u64, -low_priority: Queue, -high_priority: Queue, +high_priority: Task.Heap, +low_priority: Task.Heap, +task_pool: std.heap.MemoryPool(Task), pub fn init(allocator: std.mem.Allocator) Scheduler { return .{ - ._sequence = 0, - .low_priority = Queue.init(allocator, {}), - .high_priority = Queue.init(allocator, {}), + .task_pool = .init(allocator), + .high_priority = .{ .context = {} }, + .low_priority = .{ .context = {} }, }; } pub fn deinit(self: *Scheduler) void { - finalizeTasks(&self.low_priority); - finalizeTasks(&self.high_priority); + self.task_pool.deinit(); } -const AddOpts = struct { - name: []const u8 = "", - low_priority: bool = false, - finalizer: ?Finalizer = null, +pub const Priority = enum(u1) { low, high }; + +pub const ScheduleOptions = struct { + name: []const u8 = "unspecified", + priority: Priority = .high, + // TODO: Backport finalizer. }; -pub fn add(self: *Scheduler, ctx: *anyopaque, cb: Callback, run_in_ms: u32, opts: AddOpts) !void { + +/// Schedules a task; runs it once. +pub fn once( + self: *Scheduler, + comptime options: ScheduleOptions, + /// Type of `ctx`. + comptime T: type, + ctx: *T, + comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!void, +) !void { if (comptime IS_DEBUG) { - log.debug(.scheduler, "scheduler.add", .{ .name = opts.name, .run_in_ms = run_in_ms, .low_priority = opts.low_priority }); + log.debug(.scheduler, "Scheduler.once", .{ .name = options.name, .priority = @tagName(options.priority) }); } - var queue = if (opts.low_priority) &self.low_priority else &self.high_priority; - const seq = self._sequence + 1; - self._sequence = seq; - return queue.add(.{ + + const runner = struct { + fn runner(task: *Task, scheduler: *Scheduler) void { + // Give type-erased ctx a type. + const typed_ctx: *T = @ptrCast(@alignCast(task.ctx)); + + // Run provided callback; this can fail, we'll ignore it though. + @call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| { + log.warn(.scheduler, "Task.callback", .{ + .name = options.name, + .priority = @tagName(options.priority), + .err = err, + }); + }; + + // Return task back to pool. + scheduler.task_pool.destroy(task); + } + }.runner; + + const task = try self.task_pool.create(); + task.* = .{ + // This variant always have 0-timeout. + .run_at = milliTimestamp(.monotonic), .ctx = ctx, - .callback = cb, - .sequence = seq, - .name = opts.name, - .finalizer = opts.finalizer, + .callback = runner, + }; + + // Push to right heap. + switch (options.priority) { + .low => self.low_priority.insert(task), + .high => self.high_priority.insert(task), + } +} + +/// Action to be taken `after` callback being run. +/// Don't manually create this, prefer such syntax: +/// +/// ```zig +/// .repeat(150); // Repeat after 150ms. +/// .dont_repeat; // Don't repeat. +/// ``` +pub const AfterAction = packed struct(u32) { + /// Whether rerun. + recur: bool, + /// Largest repeat `setInterval` can have is 2147483647ms. + ms: u31, + + pub const dont_repeat = AfterAction{ .recur = false, .ms = 0 }; + + pub inline fn repeat(ms: u31) AfterAction { + return .{ .recur = true, .ms = ms }; + } +}; + +/// Schedules a task that'll be run after given time in ms. +pub fn after( + self: *Scheduler, + comptime options: ScheduleOptions, + /// Type of `ctx`. + comptime T: type, + ctx: *T, + run_in_ms: u32, + /// If an integer is returned, the task will be repeated after that much ms. + /// If null is returned, task won't be repeated. + comptime callback: *const fn (scheduler: *Scheduler, ctx: *T) anyerror!AfterAction, +) !void { + if (comptime IS_DEBUG) { + log.debug(.scheduler, "Scheduler.after", .{ + .name = options.name, + .run_in_ms = run_in_ms, + .priority = @tagName(options.priority), + }); + } + + const runner = struct { + fn runner(task: *Task, scheduler: *Scheduler) void { + // Give type-erased ctx a type. + const typed_ctx: *T = @ptrCast(@alignCast(task.ctx)); + + // Run provided callback; this can fail, we'll ignore it though. + const result = @call(.auto, callback, .{ scheduler, typed_ctx }) catch |err| { + log.warn(.scheduler, "Task.callback", .{ + .name = options.name, + .priority = @tagName(options.priority), + .err = err, + }); + + // Can't repeat w/o return value. + scheduler.task_pool.destroy(task); + return; + }; + + // If task is not repeated, disarm. + if (!result.recur) { + scheduler.task_pool.destroy(task); + return; + } + + // Wants recur. + const repeat_in_ms = result.ms; + // Task cannot be repeated immediately, and they should know that. + lp.assert(repeat_in_ms != 0, "Task.callback: 0-timer", .{ .name = options.name }); + task.run_at = milliTimestamp(.monotonic) + repeat_in_ms; + // Prefer low priority? + scheduler.low_priority.insert(task); + } + }.runner; + + const task = try self.task_pool.create(); + task.* = .{ .run_at = milliTimestamp(.monotonic) + run_in_ms, - }); + .ctx = ctx, + .callback = runner, + }; + + // Push to right heap. + switch (options.priority) { + .low => self.low_priority.insert(task), + .high => self.high_priority.insert(task), + } } pub fn run(self: *Scheduler) !?u64 { - _ = try self.runQueue(&self.low_priority); - return self.runQueue(&self.high_priority); + self.runTasks(.low); + return self.runTasks(.high); } -fn runQueue(self: *Scheduler, queue: *Queue) !?u64 { - if (queue.count() == 0) { - return null; - } +/// Runs events of the desired tree. +fn runTasks(self: *Scheduler, comptime prio: Priority) if (prio == .low) void else ?u64 { + const tree = if (comptime prio == .low) &self.low_priority else &self.high_priority; + + // const cached_time = self.cached_time; + // const now = milliTimestamp(.monotonic); + // Update cache... + // self.cached_time = now; const now = milliTimestamp(.monotonic); - while (queue.peek()) |*task_| { - if (task_.run_at > now) { - return @intCast(task_.run_at - now); - } - var task = queue.remove(); - if (comptime IS_DEBUG) { - log.debug(.scheduler, "scheduler.runTask", .{ .name = task.name }); - } - - const repeat_in_ms = task.callback(task.ctx) catch |err| { - log.warn(.scheduler, "task.callback", .{ .name = task.name, .err = err }); - continue; - }; - - if (repeat_in_ms) |ms| { - // Task cannot be repeated immediately, and they should know that - if (comptime IS_DEBUG) { - std.debug.assert(ms != 0); + while (tree.peek()) |task| { + // No tasks to execute so far. + if (task.run_at > now) { + if (comptime prio == .low) { + break; } - task.run_at = now + ms; - try self.low_priority.add(task); - } - } - return null; -} -fn finalizeTasks(queue: *Queue) void { - var it = queue.iterator(); - while (it.next()) |t| { - if (t.finalizer) |func| { - func(t.ctx); + return task.run_at - now; } + + // Remove from the heap. + const min = tree.deleteMin(); + lp.assert(min.? == task, "Scheduler.runTasks: unexpected", .{}); + + if (comptime IS_DEBUG) { + log.debug(.scheduler, "Scheduler.runTasks", .{ .prio = @tagName(prio) }); + } + + task.callback(task, self); + } + + if (comptime prio == .high) { + return null; } } +/// Internal task representation. const Task = struct { + /// When to execute this task. run_at: u64, - sequence: u64, + /// Userdata. ctx: *anyopaque, - name: []const u8, - callback: Callback, - finalizer: ?Finalizer, -}; + callback: *const fn (task: *Task, scheduler: *Scheduler) void, + heap: heap.IntrusiveField(Task) = .{}, -const Callback = *const fn (ctx: *anyopaque) anyerror!?u32; -const Finalizer = *const fn (ctx: *anyopaque) void; + const Heap = heap.Intrusive(Task, void, Task.less); + + /// Compare 2 tasks by their execution time. + fn less(_: void, a: *const Task, b: *const Task) bool { + return a.run_at < b.run_at; + } +};