MessageChannel and MessagePort

This commit is contained in:
Karl Seguin
2025-11-28 22:11:55 +08:00
parent 8858f889b4
commit 9f587ab24b
9 changed files with 343 additions and 2 deletions

View File

@@ -117,7 +117,7 @@ pub fn dispatch(self: *EventManager, target: *EventTarget, event: *Event) !void
switch (target._type) {
.node => |node| try self.dispatchNode(node, event, &was_handled),
.xhr, .window, .abort_signal, .media_query_list => {
.xhr, .window, .abort_signal, .media_query_list, .message_port => {
const list = self.lookup.getPtr(@intFromPtr(target)) orelse return;
try self.dispatchAll(list, target, event, &was_handled);
},

View File

@@ -702,6 +702,9 @@ fn _wait(self: *Page, wait_ms: u32) !Session.WaitResult {
}
pub fn tick(self: *Page) void {
if (comptime IS_DEBUG) {
log.debug(.page, "tick", .{});
}
_ = self.scheduler.run() catch |err| {
log.err(.page, "tick", .{ .err = err });
};

View File

@@ -26,17 +26,22 @@ const IS_DEBUG = builtin.mode == .Debug;
const Queue = std.PriorityQueue(Task, void, struct {
fn compare(_: void, a: Task, b: Task) std.math.Order {
return std.math.order(a.run_at, b.run_at);
const time_order = std.math.order(a.run_at, b.run_at);
if (time_order != .eq) return time_order;
// Break ties with sequence number to maintain FIFO order
return std.math.order(a.sequence, b.sequence);
}
}.compare);
const Scheduler = @This();
_sequence: u64,
low_priority: Queue,
high_priority: Queue,
pub fn init(allocator: std.mem.Allocator) Scheduler {
return .{
._sequence = 0,
.low_priority = Queue.init(allocator, {}),
.high_priority = Queue.init(allocator, {}),
};
@@ -59,9 +64,12 @@ pub fn add(self: *Scheduler, ctx: *anyopaque, cb: Callback, run_in_ms: u32, opts
log.debug(.scheduler, "scheduler.add", .{ .name = opts.name, .run_in_ms = run_in_ms, .low_priority = opts.low_priority });
}
var queue = if (opts.low_priority) &self.low_priority else &self.high_priority;
const seq = self._sequence + 1;
self._sequence = seq;
return queue.add(.{
.ctx = ctx,
.callback = cb,
.sequence = seq,
.name = opts.name,
.run_at = timestamp(.monotonic) + run_in_ms,
});
@@ -105,6 +113,7 @@ fn runQueue(self: *Scheduler, queue: *Queue) !?u64 {
const Task = struct {
run_at: u64,
sequence: u64,
ctx: *anyopaque,
name: []const u8,
callback: Callback,

View File

@@ -751,6 +751,10 @@ const Script = struct {
break :blk true;
};
if (comptime IS_DEBUG) {
log.info(.browser, "executed script", .{.src = url});
}
defer page.tick();
if (success) {

View File

@@ -552,6 +552,8 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/event/ErrorEvent.zig"),
@import("../webapi/event/MessageEvent.zig"),
@import("../webapi/event/ProgressEvent.zig"),
@import("../webapi/MessageChannel.zig"),
@import("../webapi/MessagePort.zig"),
@import("../webapi/EventTarget.zig"),
@import("../webapi/Location.zig"),
@import("../webapi/Navigator.zig"),

View File

@@ -0,0 +1,86 @@
<!DOCTYPE html>
<body>
<script src="testing.js"></script>
<script id="basic">
{
const channel = new MessageChannel();
testing.expectEqual(true, channel.port1 !== undefined);
testing.expectEqual(true, channel.port2 !== undefined);
testing.expectEqual(true, channel.port1 !== channel.port2);
}
{
const channel = new MessageChannel();
let received = null;
channel.port2.onmessage = function(e) {
received = e.data;
};
channel.port1.postMessage('hello');
setTimeout(() => {
testing.expectEqual('hello', received);
}, 10);
}
testing.async(async () => {
let messages = [];
let p = new Promise((resolve) => {
const channel = new MessageChannel();
channel.port2.addEventListener('message', (e) => {
messages.push(e.data);
if (e.data === 'third') {
resolve();
}
});
channel.port2.start();
channel.port1.postMessage('first');
channel.port1.postMessage('second');
channel.port1.postMessage('third');
});
await p;
testing.expectEqual(3, messages.length);
testing.expectEqual('first', messages[0]);
testing.expectEqual('second', messages[1]);
testing.expectEqual('third', messages[2]);
});
{
const channel = new MessageChannel();
let port1Count = 0;
let port2Count = 0;
channel.port1.onmessage = () => { port1Count++; };
channel.port2.onmessage = () => { port2Count++; };
channel.port1.postMessage('to port2');
channel.port2.postMessage('to port1');
setTimeout(() => {
testing.expectEqual(1, port1Count);
testing.expectEqual(1, port2Count);
}, 30);
}
{
const channel = new MessageChannel();
let received = null;
channel.port2.onmessage = (e) => {
received = e.data;
};
channel.port1.postMessage({ type: 'test', value: 42 });
setTimeout(() => {
testing.expectEqual('test', received.type);
testing.expectEqual(42, received.value);
}, 40);
}
</script>

View File

@@ -34,6 +34,7 @@ pub const Type = union(enum) {
xhr: *@import("net/XMLHttpRequestEventTarget.zig"),
abort_signal: *@import("AbortSignal.zig"),
media_query_list: *@import("css/MediaQueryList.zig"),
message_port: *@import("MessagePort.zig"),
};
pub fn dispatchEvent(self: *EventTarget, event: *Event, page: *Page) !bool {
@@ -101,6 +102,7 @@ pub fn format(self: *EventTarget, writer: *std.Io.Writer) !void {
.xhr => writer.writeAll("<XMLHttpRequestEventTarget>"),
.abort_signal => writer.writeAll("<abort_signal>"),
.media_query_list => writer.writeAll("<MediaQueryList>"),
.message_port => writer.writeAll("<MessagePort>"),
};
}

View File

@@ -0,0 +1,66 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../js/js.zig");
const Page = @import("../Page.zig");
const MessagePort = @import("MessagePort.zig");
const MessageChannel = @This();
_port1: *MessagePort,
_port2: *MessagePort,
pub fn init(page: *Page) !*MessageChannel {
const port1 = try MessagePort.init(page);
const port2 = try MessagePort.init(page);
MessagePort.entangle(port1, port2);
return page._factory.create(MessageChannel{
._port1 = port1,
._port2 = port2,
});
}
pub fn getPort1(self: *const MessageChannel) *MessagePort {
return self._port1;
}
pub fn getPort2(self: *const MessageChannel) *MessagePort {
return self._port2;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(MessageChannel);
pub const Meta = struct {
pub const name = "MessageChannel";
pub var class_id: bridge.ClassId = undefined;
pub const prototype_chain = bridge.prototypeChain();
};
pub const constructor = bridge.constructor(MessageChannel.init, .{});
pub const port1 = bridge.accessor(MessageChannel.getPort1, null, .{});
pub const port2 = bridge.accessor(MessageChannel.getPort2, null, .{});
};
const testing = @import("../../testing.zig");
test "WebApi: MessageChannel" {
try testing.htmlRunner("message_channel.html", .{});
}

View File

@@ -0,0 +1,169 @@
// Copyright (C) 2023-2025 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const js = @import("../js/js.zig");
const log = @import("../../log.zig");
const Page = @import("../Page.zig");
const EventTarget = @import("EventTarget.zig");
const MessageEvent = @import("event/MessageEvent.zig");
const MessagePort = @This();
_proto: *EventTarget,
_enabled: bool = false,
_closed: bool = false,
_on_message: ?js.Function = null,
_on_message_error: ?js.Function = null,
_entangled_port: ?*MessagePort = null,
pub fn init(page: *Page) !*MessagePort {
return page._factory.eventTarget(MessagePort{
._proto = undefined,
});
}
pub fn asEventTarget(self: *MessagePort) *EventTarget {
return self._proto;
}
pub fn entangle(port1: *MessagePort, port2: *MessagePort) void {
port1._entangled_port = port2;
port2._entangled_port = port1;
}
pub fn postMessage(self: *MessagePort, message: js.Object, page: *Page) !void {
if (self._closed) {
return;
}
const other = self._entangled_port orelse return;
if (other._closed) {
return;
}
// Create callback to deliver message
const callback = try page._factory.create(PostMessageCallback{
.page = page,
.port = other,
.message = try message.persist(),
});
try page.scheduler.add(callback, PostMessageCallback.run, 0, .{
.name = "MessagePort.postMessage",
.low_priority = false,
});
}
pub fn start(self: *MessagePort) void {
if (self._closed) {
return;
}
self._enabled = true;
}
pub fn close(self: *MessagePort) void {
self._closed = true;
// Break entanglement
if (self._entangled_port) |other| {
other._entangled_port = null;
}
self._entangled_port = null;
}
pub fn getOnMessage(self: *const MessagePort) ?js.Function {
return self._on_message;
}
pub fn setOnMessage(self: *MessagePort, cb_: ?js.Function) !void {
if (cb_) |cb| {
self._on_message = cb;
} else {
self._on_message = null;
}
}
pub fn getOnMessageError(self: *const MessagePort) ?js.Function {
return self._on_message_error;
}
pub fn setOnMessageError(self: *MessagePort, cb_: ?js.Function) !void {
if (cb_) |cb| {
self._on_message_error = cb;
} else {
self._on_message_error = null;
}
}
const PostMessageCallback = struct {
port: *MessagePort,
message: js.Object,
page: *Page,
fn deinit(self: *PostMessageCallback) void {
self.page._factory.destroy(self);
}
fn run(ctx: *anyopaque) !?u32 {
const self: *PostMessageCallback = @ptrCast(@alignCast(ctx));
defer self.deinit();
if (self.port._closed) {
return null;
}
const event = MessageEvent.init("message", .{
.data = self.message,
.origin = "",
.source = null,
}, self.page) catch |err| {
log.err(.dom, "MessagePort.postMessage", .{.err = err});
return null;
};
self.page._event_manager.dispatchWithFunction(
self.port.asEventTarget(),
event.asEvent(),
self.port._on_message,
.{ .context = "MessagePort message" },
) catch |err| {
log.err(.dom, "MessagePort.postMessage", .{.err = err});
};
return null;
}
};
pub const JsApi = struct {
pub const bridge = js.Bridge(MessagePort);
pub const Meta = struct {
pub const name = "MessagePort";
pub var class_id: bridge.ClassId = undefined;
pub const prototype_chain = bridge.prototypeChain();
};
pub const postMessage = bridge.function(MessagePort.postMessage, .{});
pub const start = bridge.function(MessagePort.start, .{});
pub const close = bridge.function(MessagePort.close, .{});
pub const onmessage = bridge.accessor(MessagePort.getOnMessage, MessagePort.setOnMessage, .{});
pub const onmessageerror = bridge.accessor(MessagePort.getOnMessageError, MessagePort.setOnMessageError, .{});
};