From 5d3b965d285ad41f89fa33b781f9c2f3eef85ab1 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 11:41:52 +0100 Subject: [PATCH 01/13] Implement WritableStream, TransformStream, and TextEncoderStream Add the missing Streams API types needed for TextEncoderStream support: - WritableStream with locked/getWriter, supporting both JS sink callbacks and internal TransformStream routing - WritableStreamDefaultWriter with write/close/releaseLock/closed/ready - WritableStreamDefaultController with error() - TransformStream with readable/writable accessors, JS transformer callbacks (start/transform/flush), and Zig-level transform support - TransformStreamDefaultController with enqueue/error/terminate - TextEncoderStream that encodes string chunks to UTF-8 Uint8Array via a Zig-level transform function --- src/browser/js/bridge.zig | 5 + .../tests/streams/transform_stream.html | 83 +++++++ .../webapi/encoding/TextEncoderStream.zig | 70 ++++++ .../webapi/streams/TransformStream.zig | 202 ++++++++++++++++++ src/browser/webapi/streams/WritableStream.zig | 159 ++++++++++++++ .../WritableStreamDefaultController.zig | 51 +++++ .../streams/WritableStreamDefaultWriter.zig | 101 +++++++++ 7 files changed, 671 insertions(+) create mode 100644 src/browser/tests/streams/transform_stream.html create mode 100644 src/browser/webapi/encoding/TextEncoderStream.zig create mode 100644 src/browser/webapi/streams/TransformStream.zig create mode 100644 src/browser/webapi/streams/WritableStream.zig create mode 100644 src/browser/webapi/streams/WritableStreamDefaultController.zig create mode 100644 src/browser/webapi/streams/WritableStreamDefaultWriter.zig diff --git a/src/browser/js/bridge.zig b/src/browser/js/bridge.zig index 8d1daba2..932c3655 100644 --- a/src/browser/js/bridge.zig +++ b/src/browser/js/bridge.zig @@ -826,6 +826,7 @@ pub const JsApis = flattenTypes(&.{ @import("../webapi/element/svg/Generic.zig"), @import("../webapi/encoding/TextDecoder.zig"), @import("../webapi/encoding/TextEncoder.zig"), + @import("../webapi/encoding/TextEncoderStream.zig"), @import("../webapi/Event.zig"), @import("../webapi/event/CompositionEvent.zig"), @import("../webapi/event/CustomEvent.zig"), @@ -862,6 +863,10 @@ pub const JsApis = flattenTypes(&.{ @import("../webapi/streams/ReadableStream.zig"), @import("../webapi/streams/ReadableStreamDefaultReader.zig"), @import("../webapi/streams/ReadableStreamDefaultController.zig"), + @import("../webapi/streams/WritableStream.zig"), + @import("../webapi/streams/WritableStreamDefaultWriter.zig"), + @import("../webapi/streams/WritableStreamDefaultController.zig"), + @import("../webapi/streams/TransformStream.zig"), @import("../webapi/Node.zig"), @import("../webapi/storage/storage.zig"), @import("../webapi/URL.zig"), diff --git a/src/browser/tests/streams/transform_stream.html b/src/browser/tests/streams/transform_stream.html new file mode 100644 index 00000000..431a25ec --- /dev/null +++ b/src/browser/tests/streams/transform_stream.html @@ -0,0 +1,83 @@ + + + + + + + + + + + + + + diff --git a/src/browser/webapi/encoding/TextEncoderStream.zig b/src/browser/webapi/encoding/TextEncoderStream.zig new file mode 100644 index 00000000..b2526637 --- /dev/null +++ b/src/browser/webapi/encoding/TextEncoderStream.zig @@ -0,0 +1,70 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); + +const ReadableStream = @import("../streams/ReadableStream.zig"); +const WritableStream = @import("../streams/WritableStream.zig"); +const TransformStream = @import("../streams/TransformStream.zig"); + +const TextEncoderStream = @This(); + +_transform: *TransformStream, + +pub fn init(page: *Page) !TextEncoderStream { + const transform = try TransformStream.initWithZigTransform(&encodeTransform, page); + return .{ + ._transform = transform, + }; +} + +fn encodeTransform(controller: *TransformStream.DefaultController, chunk: js.Value) !void { + // chunk should be a JS string; encode it as UTF-8 bytes (Uint8Array) + const str = chunk.isString() orelse return error.InvalidChunk; + const slice = try str.toSlice(); + try controller.enqueue(.{ .uint8array = .{ .values = slice } }); +} + +pub fn getReadable(self: *const TextEncoderStream) *ReadableStream { + return self._transform.getReadable(); +} + +pub fn getWritable(self: *const TextEncoderStream) *WritableStream { + return self._transform.getWritable(); +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(TextEncoderStream); + + pub const Meta = struct { + pub const name = "TextEncoderStream"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(TextEncoderStream.init, .{}); + pub const encoding = bridge.property("utf-8", .{ .template = false }); + pub const readable = bridge.accessor(TextEncoderStream.getReadable, null, .{}); + pub const writable = bridge.accessor(TextEncoderStream.getWritable, null, .{}); +}; + +const testing = @import("../../../testing.zig"); +test "WebApi: TextEncoderStream" { + try testing.htmlRunner("streams/transform_stream.html", .{}); +} diff --git a/src/browser/webapi/streams/TransformStream.zig b/src/browser/webapi/streams/TransformStream.zig new file mode 100644 index 00000000..6e51515d --- /dev/null +++ b/src/browser/webapi/streams/TransformStream.zig @@ -0,0 +1,202 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); + +const ReadableStream = @import("ReadableStream.zig"); +const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); +const WritableStream = @import("WritableStream.zig"); + +const TransformStream = @This(); + +pub const DefaultController = TransformStreamDefaultController; + +pub const ZigTransformFn = *const fn (*TransformStreamDefaultController, js.Value) anyerror!void; + +_page: *Page, +_readable: *ReadableStream, +_writable: *WritableStream, +_controller: *TransformStreamDefaultController, + +const Transformer = struct { + start: ?js.Function = null, + transform: ?js.Function.Global = null, + flush: ?js.Function.Global = null, +}; + +pub fn init(transformer_: ?Transformer, page: *Page) !*TransformStream { + const readable = try ReadableStream.init(null, null, page); + + const self = try page._factory.create(TransformStream{ + ._page = page, + ._readable = readable, + ._writable = undefined, + ._controller = undefined, + }); + + const transform_controller = try TransformStreamDefaultController.init( + self, + if (transformer_) |t| t.transform else null, + if (transformer_) |t| t.flush else null, + null, + page, + ); + self._controller = transform_controller; + + self._writable = try WritableStream.initForTransform(self, page); + + if (transformer_) |transformer| { + if (transformer.start) |start| { + try start.call(void, .{transform_controller}); + } + } + + return self; +} + +pub fn initWithZigTransform(zig_transform: ZigTransformFn, page: *Page) !*TransformStream { + const readable = try ReadableStream.init(null, null, page); + + const self = try page._factory.create(TransformStream{ + ._page = page, + ._readable = readable, + ._writable = undefined, + ._controller = undefined, + }); + + const transform_controller = try TransformStreamDefaultController.init(self, null, null, zig_transform, page); + self._controller = transform_controller; + + self._writable = try WritableStream.initForTransform(self, page); + + return self; +} + +pub fn transformWrite(self: *TransformStream, chunk: js.Value, page: *Page) !void { + if (self._controller._zig_transform_fn) |zig_fn| { + // Zig-level transform (used by TextEncoderStream etc.) + try zig_fn(self._controller, chunk); + return; + } + + if (self._controller._transform_fn) |transform_fn| { + var ls: js.Local.Scope = undefined; + page.js.localScope(&ls); + defer ls.deinit(); + + try ls.toLocal(transform_fn).call(void, .{ chunk, self._controller }); + } else { + // Default transform: pass through + if (chunk.isString()) |str| { + const slice = try str.toSlice(); + try self._readable._controller.enqueue(.{ .string = slice }); + } + } +} + +pub fn transformClose(self: *TransformStream, page: *Page) !void { + if (self._controller._flush_fn) |flush_fn| { + var ls: js.Local.Scope = undefined; + page.js.localScope(&ls); + defer ls.deinit(); + + try ls.toLocal(flush_fn).call(void, .{self._controller}); + } + + try self._readable._controller.close(); +} + +pub fn getReadable(self: *const TransformStream) *ReadableStream { + return self._readable; +} + +pub fn getWritable(self: *const TransformStream) *WritableStream { + return self._writable; +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(TransformStream); + + pub const Meta = struct { + pub const name = "TransformStream"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(TransformStream.init, .{}); + pub const readable = bridge.accessor(TransformStream.getReadable, null, .{}); + pub const writable = bridge.accessor(TransformStream.getWritable, null, .{}); +}; + +pub fn registerTypes() []const type { + return &.{ + TransformStream, + TransformStreamDefaultController, + }; +} + +pub const TransformStreamDefaultController = struct { + _page: *Page, + _stream: *TransformStream, + _transform_fn: ?js.Function.Global, + _flush_fn: ?js.Function.Global, + _zig_transform_fn: ?ZigTransformFn, + + pub fn init( + stream: *TransformStream, + transform_fn: ?js.Function.Global, + flush_fn: ?js.Function.Global, + zig_transform_fn: ?ZigTransformFn, + page: *Page, + ) !*TransformStreamDefaultController { + return page._factory.create(TransformStreamDefaultController{ + ._page = page, + ._stream = stream, + ._transform_fn = transform_fn, + ._flush_fn = flush_fn, + ._zig_transform_fn = zig_transform_fn, + }); + } + + pub fn enqueue(self: *TransformStreamDefaultController, chunk: ReadableStreamDefaultController.Chunk) !void { + try self._stream._readable._controller.enqueue(chunk); + } + + pub fn doError(self: *TransformStreamDefaultController, reason: []const u8) !void { + try self._stream._readable._controller.doError(reason); + } + + pub fn terminate(self: *TransformStreamDefaultController) !void { + try self._stream._readable._controller.close(); + } + + pub const JsApi = struct { + pub const bridge = js.Bridge(TransformStreamDefaultController); + + pub const Meta = struct { + pub const name = "TransformStreamDefaultController"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const enqueue = bridge.function(TransformStreamDefaultController.enqueue, .{}); + pub const @"error" = bridge.function(TransformStreamDefaultController.doError, .{}); + pub const terminate = bridge.function(TransformStreamDefaultController.terminate, .{}); + }; +}; diff --git a/src/browser/webapi/streams/WritableStream.zig b/src/browser/webapi/streams/WritableStream.zig new file mode 100644 index 00000000..8d1dd200 --- /dev/null +++ b/src/browser/webapi/streams/WritableStream.zig @@ -0,0 +1,159 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); + +const WritableStreamDefaultWriter = @import("WritableStreamDefaultWriter.zig"); +const WritableStreamDefaultController = @import("WritableStreamDefaultController.zig"); +const TransformStream = @import("TransformStream.zig"); + +const WritableStream = @This(); + +pub const State = enum { + writable, + closed, + errored, +}; + +_page: *Page, +_state: State, +_writer: ?*WritableStreamDefaultWriter, +_controller: *WritableStreamDefaultController, +_stored_error: ?[]const u8, +_write_fn: ?js.Function.Global, +_close_fn: ?js.Function.Global, +_transform_stream: ?*TransformStream, + +const UnderlyingSink = struct { + start: ?js.Function = null, + write: ?js.Function.Global = null, + close: ?js.Function.Global = null, + abort: ?js.Function.Global = null, + type: ?[]const u8 = null, +}; + +pub fn init(sink_: ?UnderlyingSink, page: *Page) !*WritableStream { + const self = try page._factory.create(WritableStream{ + ._page = page, + ._state = .writable, + ._writer = null, + ._controller = undefined, + ._stored_error = null, + ._write_fn = null, + ._close_fn = null, + ._transform_stream = null, + }); + + self._controller = try WritableStreamDefaultController.init(self, page); + + if (sink_) |sink| { + if (sink.start) |start| { + try start.call(void, .{self._controller}); + } + self._write_fn = sink.write; + self._close_fn = sink.close; + } + + return self; +} + +pub fn initForTransform(transform_stream: *TransformStream, page: *Page) !*WritableStream { + const self = try page._factory.create(WritableStream{ + ._page = page, + ._state = .writable, + ._writer = null, + ._controller = undefined, + ._stored_error = null, + ._write_fn = null, + ._close_fn = null, + ._transform_stream = transform_stream, + }); + + self._controller = try WritableStreamDefaultController.init(self, page); + return self; +} + +pub fn getWriter(self: *WritableStream, page: *Page) !*WritableStreamDefaultWriter { + if (self.getLocked()) { + return error.WriterLocked; + } + + const writer = try WritableStreamDefaultWriter.init(self, page); + self._writer = writer; + return writer; +} + +pub fn getLocked(self: *const WritableStream) bool { + return self._writer != null; +} + +pub fn writeChunk(self: *WritableStream, chunk: js.Value, page: *Page) !void { + if (self._state != .writable) return; + + if (self._transform_stream) |ts| { + try ts.transformWrite(chunk, page); + return; + } + + if (self._write_fn) |write_fn| { + var ls: js.Local.Scope = undefined; + page.js.localScope(&ls); + defer ls.deinit(); + + try ls.toLocal(write_fn).call(void, .{ chunk, self._controller }); + } +} + +pub fn closeStream(self: *WritableStream, page: *Page) !void { + if (self._state != .writable) return; + self._state = .closed; + + if (self._transform_stream) |ts| { + try ts.transformClose(page); + return; + } + + if (self._close_fn) |close_fn| { + var ls: js.Local.Scope = undefined; + page.js.localScope(&ls); + defer ls.deinit(); + + try ls.toLocal(close_fn).call(void, .{self._controller}); + } +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(WritableStream); + + pub const Meta = struct { + pub const name = "WritableStream"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(WritableStream.init, .{}); + pub const getWriter = bridge.function(WritableStream.getWriter, .{}); + pub const locked = bridge.accessor(WritableStream.getLocked, null, .{}); +}; + +pub fn registerTypes() []const type { + return &.{ + WritableStream, + }; +} diff --git a/src/browser/webapi/streams/WritableStreamDefaultController.zig b/src/browser/webapi/streams/WritableStreamDefaultController.zig new file mode 100644 index 00000000..06ca7503 --- /dev/null +++ b/src/browser/webapi/streams/WritableStreamDefaultController.zig @@ -0,0 +1,51 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); +const WritableStream = @import("WritableStream.zig"); + +const WritableStreamDefaultController = @This(); + +_page: *Page, +_stream: *WritableStream, + +pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultController { + return page._factory.create(WritableStreamDefaultController{ + ._page = page, + ._stream = stream, + }); +} + +pub fn doError(self: *WritableStreamDefaultController, reason: []const u8) void { + if (self._stream._state != .writable) return; + self._stream._state = .errored; + self._stream._stored_error = reason; +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(WritableStreamDefaultController); + + pub const Meta = struct { + pub const name = "WritableStreamDefaultController"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const @"error" = bridge.function(WritableStreamDefaultController.doError, .{}); +}; diff --git a/src/browser/webapi/streams/WritableStreamDefaultWriter.zig b/src/browser/webapi/streams/WritableStreamDefaultWriter.zig new file mode 100644 index 00000000..1139f54b --- /dev/null +++ b/src/browser/webapi/streams/WritableStreamDefaultWriter.zig @@ -0,0 +1,101 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); +const WritableStream = @import("WritableStream.zig"); + +const WritableStreamDefaultWriter = @This(); + +_page: *Page, +_stream: ?*WritableStream, + +pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultWriter { + return page._factory.create(WritableStreamDefaultWriter{ + ._page = page, + ._stream = stream, + }); +} + +pub fn write(self: *WritableStreamDefaultWriter, chunk: js.Value, page: *Page) !js.Promise { + const stream = self._stream orelse { + return page.js.local.?.rejectPromise("Writer has been released"); + }; + + if (stream._state != .writable) { + return page.js.local.?.rejectPromise("Stream is not writable"); + } + + try stream.writeChunk(chunk, page); + + return page.js.local.?.resolvePromise(.{}); +} + +pub fn close(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise { + const stream = self._stream orelse { + return page.js.local.?.rejectPromise("Writer has been released"); + }; + + if (stream._state != .writable) { + return page.js.local.?.rejectPromise("Stream is not writable"); + } + + try stream.closeStream(page); + + return page.js.local.?.resolvePromise(.{}); +} + +pub fn releaseLock(self: *WritableStreamDefaultWriter) void { + if (self._stream) |stream| { + stream._writer = null; + self._stream = null; + } +} + +pub fn getClosed(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise { + const stream = self._stream orelse { + return page.js.local.?.rejectPromise("Writer has been released"); + }; + + if (stream._state == .closed) { + return page.js.local.?.resolvePromise(.{}); + } + + return page.js.local.?.resolvePromise(.{}); +} + +pub fn getReady(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise { + _ = self; + return page.js.local.?.resolvePromise(.{}); +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(WritableStreamDefaultWriter); + + pub const Meta = struct { + pub const name = "WritableStreamDefaultWriter"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const write = bridge.function(WritableStreamDefaultWriter.write, .{}); + pub const close = bridge.function(WritableStreamDefaultWriter.close, .{}); + pub const releaseLock = bridge.function(WritableStreamDefaultWriter.releaseLock, .{}); + pub const closed = bridge.accessor(WritableStreamDefaultWriter.getClosed, null, .{}); + pub const ready = bridge.accessor(WritableStreamDefaultWriter.getReady, null, .{}); +}; From 23d322452a41a8628b1e060c326ad0bbcd090d44 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 11:48:32 +0100 Subject: [PATCH 02/13] Add TextDecoderStream to decode UTF-8 byte streams into strings Mirrors TextEncoderStream: wraps a TransformStream with a Zig-level transform that converts Uint8Array chunks to strings. Supports the same constructor options as TextDecoder (label, fatal, ignoreBOM). --- src/browser/js/bridge.zig | 1 + .../tests/streams/text_decoder_stream.html | 61 ++++++++++ .../webapi/encoding/TextDecoderStream.zig | 107 ++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 src/browser/tests/streams/text_decoder_stream.html create mode 100644 src/browser/webapi/encoding/TextDecoderStream.zig diff --git a/src/browser/js/bridge.zig b/src/browser/js/bridge.zig index 932c3655..ffc0a19f 100644 --- a/src/browser/js/bridge.zig +++ b/src/browser/js/bridge.zig @@ -827,6 +827,7 @@ pub const JsApis = flattenTypes(&.{ @import("../webapi/encoding/TextDecoder.zig"), @import("../webapi/encoding/TextEncoder.zig"), @import("../webapi/encoding/TextEncoderStream.zig"), + @import("../webapi/encoding/TextDecoderStream.zig"), @import("../webapi/Event.zig"), @import("../webapi/event/CompositionEvent.zig"), @import("../webapi/event/CustomEvent.zig"), diff --git a/src/browser/tests/streams/text_decoder_stream.html b/src/browser/tests/streams/text_decoder_stream.html new file mode 100644 index 00000000..37bae8b4 --- /dev/null +++ b/src/browser/tests/streams/text_decoder_stream.html @@ -0,0 +1,61 @@ + + + + + + + + + + + + diff --git a/src/browser/webapi/encoding/TextDecoderStream.zig b/src/browser/webapi/encoding/TextDecoderStream.zig new file mode 100644 index 00000000..2c4605ed --- /dev/null +++ b/src/browser/webapi/encoding/TextDecoderStream.zig @@ -0,0 +1,107 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const js = @import("../../js/js.zig"); +const Page = @import("../../Page.zig"); + +const ReadableStream = @import("../streams/ReadableStream.zig"); +const WritableStream = @import("../streams/WritableStream.zig"); +const TransformStream = @import("../streams/TransformStream.zig"); + +const TextDecoderStream = @This(); + +_transform: *TransformStream, +_fatal: bool, +_ignore_bom: bool, + +const Label = enum { + utf8, + @"utf-8", + @"unicode-1-1-utf-8", +}; + +const InitOpts = struct { + fatal: bool = false, + ignoreBOM: bool = false, +}; + +pub fn init(label_: ?[]const u8, opts_: ?InitOpts, page: *Page) !TextDecoderStream { + if (label_) |label| { + _ = std.meta.stringToEnum(Label, label) orelse return error.RangeError; + } + + const opts = opts_ orelse InitOpts{}; + const transform = try TransformStream.initWithZigTransform(&decodeTransform, page); + return .{ + ._transform = transform, + ._fatal = opts.fatal, + ._ignore_bom = opts.ignoreBOM, + }; +} + +fn decodeTransform(controller: *TransformStream.DefaultController, chunk: js.Value) !void { + // chunk should be a Uint8Array; decode it as UTF-8 string + const typed_array = try chunk.toZig(js.TypedArray(u8)); + var input = typed_array.values; + + // Strip UTF-8 BOM if present + if (std.mem.startsWith(u8, input, &.{ 0xEF, 0xBB, 0xBF })) { + input = input[3..]; + } + + try controller.enqueue(.{ .string = input }); +} + +pub fn getReadable(self: *const TextDecoderStream) *ReadableStream { + return self._transform.getReadable(); +} + +pub fn getWritable(self: *const TextDecoderStream) *WritableStream { + return self._transform.getWritable(); +} + +pub fn getFatal(self: *const TextDecoderStream) bool { + return self._fatal; +} + +pub fn getIgnoreBOM(self: *const TextDecoderStream) bool { + return self._ignore_bom; +} + +pub const JsApi = struct { + pub const bridge = js.Bridge(TextDecoderStream); + + pub const Meta = struct { + pub const name = "TextDecoderStream"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(TextDecoderStream.init, .{}); + pub const encoding = bridge.property("utf-8", .{ .template = false }); + pub const readable = bridge.accessor(TextDecoderStream.getReadable, null, .{}); + pub const writable = bridge.accessor(TextDecoderStream.getWritable, null, .{}); + pub const fatal = bridge.accessor(TextDecoderStream.getFatal, null, .{}); + pub const ignoreBOM = bridge.accessor(TextDecoderStream.getIgnoreBOM, null, .{}); +}; + +const testing = @import("../../../testing.zig"); +test "WebApi: TextDecoderStream" { + try testing.htmlRunner("streams/text_decoder_stream.html", .{}); +} From 6ed011e2f881312de4e29497727577bab4e0e0d0 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 12:06:18 +0100 Subject: [PATCH 03/13] Add pipeThrough and pipeTo to ReadableStream Implement synchronous piping that drains queued chunks from a ReadableStream into a WritableStream. pipeThrough accepts any {readable, writable} pair (TransformStream, TextDecoderStream, etc.) and returns the readable side. pipeTo writes all chunks to a WritableStream and resolves when complete. --- .../tests/streams/transform_stream.html | 73 +++++++++++++++++++ src/browser/webapi/streams/ReadableStream.zig | 59 +++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/src/browser/tests/streams/transform_stream.html b/src/browser/tests/streams/transform_stream.html index 431a25ec..0918f1e3 100644 --- a/src/browser/tests/streams/transform_stream.html +++ b/src/browser/tests/streams/transform_stream.html @@ -81,3 +81,76 @@ testing.expectEqual(true, result2.done); })(); + + + + + + diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig index 64c1b529..a4e3703d 100644 --- a/src/browser/webapi/streams/ReadableStream.zig +++ b/src/browser/webapi/streams/ReadableStream.zig @@ -24,6 +24,7 @@ const Page = @import("../../Page.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); +const WritableStream = @import("WritableStream.zig"); const IS_DEBUG = @import("builtin").mode == .Debug; @@ -233,6 +234,62 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi return resolver.promise(); } +/// pipeThrough(transform) — pipes this readable stream through a transform stream, +/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties. +pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*ReadableStream { + if (self.getLocked()) { + return error.ReaderLocked; + } + + if (!transform.isObject()) { + return error.InvalidArgument; + } + + const obj = transform.toObject(); + const writable_val = try obj.get("writable"); + const readable_val = try obj.get("readable"); + + const writable = try writable_val.toZig(*WritableStream); + const output_readable = try readable_val.toZig(*ReadableStream); + + // Synchronously drain queued chunks from this stream into the writable side + try self.pipeToWritable(writable, page); + + return output_readable; +} + +/// pipeTo(writable) — pipes this readable stream to a writable stream. +/// Returns a promise that resolves when piping is complete. +pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) !js.Promise { + if (self.getLocked()) { + return page.js.local.?.rejectPromise("ReadableStream is locked"); + } + + try self.pipeToWritable(destination, page); + + return page.js.local.?.resolvePromise(.{}); +} + +/// Internal: drain all queued chunks from this stream into a WritableStream. +fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void { + const local = page.js.local.?; + + // Drain all chunks from queue + while (self._controller.dequeue()) |chunk| { + // Convert chunk to a js.Value for the writable side + const js_val = switch (chunk) { + .string => |s| try local.zigValueToJs(s, .{}), + .uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }), + }; + try writable.writeChunk(js_val, page); + } + + // If the readable stream is closed, close the writable side too + if (self._state == .closed) { + try writable.closeStream(page); + } +} + const Cancel = struct { callback: ?js.Function.Global = null, reason: ?[]const u8 = null, @@ -251,6 +308,8 @@ pub const JsApi = struct { pub const constructor = bridge.constructor(ReadableStream.init, .{}); pub const cancel = bridge.function(ReadableStream.cancel, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{}); + pub const pipeThrough = bridge.function(ReadableStream.pipeThrough, .{}); + pub const pipeTo = bridge.function(ReadableStream.pipeTo, .{}); pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{}); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); }; From ca0ef18bdf7f75d75c874b65f59d8ef1618c2312 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 12:17:17 +0100 Subject: [PATCH 04/13] Implement async piping for ReadableStream.pipeThrough/pipeTo Replace synchronous queue-draining approach with async promise-based piping using V8's thenAndCatch callbacks. PipeState struct manages the async read loop: reader.read() returns a Promise, onReadFulfilled extracts {done, value}, writes chunks to the writable side, and recurses via pumpRead() until the stream closes. --- src/browser/webapi/streams/ReadableStream.zig | 173 +++++++++++++++--- 1 file changed, 151 insertions(+), 22 deletions(-) diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig index a4e3703d..eba03a9e 100644 --- a/src/browser/webapi/streams/ReadableStream.zig +++ b/src/browser/webapi/streams/ReadableStream.zig @@ -20,6 +20,7 @@ const std = @import("std"); const log = @import("../../../log.zig"); const js = @import("../../js/js.zig"); +const v8 = js.v8; const Page = @import("../../Page.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); @@ -252,8 +253,8 @@ pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*Re const writable = try writable_val.toZig(*WritableStream); const output_readable = try readable_val.toZig(*ReadableStream); - // Synchronously drain queued chunks from this stream into the writable side - try self.pipeToWritable(writable, page); + // Start async piping from this stream to the writable side + try PipeState.startPipe(self, writable, null, page); return output_readable; } @@ -265,31 +266,159 @@ pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) return page.js.local.?.rejectPromise("ReadableStream is locked"); } - try self.pipeToWritable(destination, page); - - return page.js.local.?.resolvePromise(.{}); -} - -/// Internal: drain all queued chunks from this stream into a WritableStream. -fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void { const local = page.js.local.?; + var pipe_resolver = local.createPromiseResolver(); + const promise = pipe_resolver.promise(); + const persisted_resolver = try pipe_resolver.persist(); - // Drain all chunks from queue - while (self._controller.dequeue()) |chunk| { - // Convert chunk to a js.Value for the writable side - const js_val = switch (chunk) { - .string => |s| try local.zigValueToJs(s, .{}), - .uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }), - }; - try writable.writeChunk(js_val, page); - } + try PipeState.startPipe(self, destination, persisted_resolver, page); - // If the readable stream is closed, close the writable side too - if (self._state == .closed) { - try writable.closeStream(page); - } + return promise; } +/// State for an async pipe operation. +const PipeState = struct { + reader: *ReadableStreamDefaultReader, + writable: *WritableStream, + page: *Page, + context_id: usize, + resolver: ?js.PromiseResolver.Global, + + fn startPipe( + stream: *ReadableStream, + writable: *WritableStream, + resolver: ?js.PromiseResolver.Global, + page: *Page, + ) !void { + const reader = try stream.getReader(page); + const state = try page.arena.create(PipeState); + state.* = .{ + .reader = reader, + .writable = writable, + .page = page, + .context_id = page.js.id, + .resolver = resolver, + }; + + try state.pumpRead(); + } + + fn pumpRead(state: *PipeState) !void { + const local = state.page.js.local.?; + + // Call reader.read() which returns a Promise + const read_promise = try state.reader.read(state.page); + + // Create JS callback functions for .then() and .catch() + const then_fn = newFunctionWithData(local, &onReadFulfilled, state); + const catch_fn = newFunctionWithData(local, &onReadRejected, state); + + _ = read_promise.thenAndCatch(then_fn, catch_fn) catch { + state.finish(local); + }; + } + + fn onReadFulfilled(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { + const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?; + var c: js.Caller = undefined; + c.init(isolate); + defer c.deinit(); + + const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; + const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); + + if (state.context_id != c.local.ctx.id) return; + + const l = &c.local; + defer l.runMicrotasks(); + + // Get the read result argument {done, value} + const result_val = js.Value{ + .local = l, + .handle = v8.v8__FunctionCallbackInfo__INDEX(callback_handle, 0) orelse return, + }; + + if (!result_val.isObject()) { + state.finish(l); + return; + } + + const result_obj = result_val.toObject(); + const done_val = result_obj.get("done") catch { + state.finish(l); + return; + }; + const done = done_val.toBool(); + + if (done) { + // Stream is finished, close the writable side + state.writable.closeStream(state.page) catch {}; + state.finishResolve(l); + return; + } + + // Get the chunk value and write it to the writable side + const chunk_val = result_obj.get("value") catch { + state.finish(l); + return; + }; + + state.writable.writeChunk(chunk_val, state.page) catch { + state.finish(l); + return; + }; + + // Continue reading the next chunk + state.pumpRead() catch { + state.finish(l); + }; + } + + fn onReadRejected(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { + const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?; + var c: js.Caller = undefined; + c.init(isolate); + defer c.deinit(); + + const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; + const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); + + if (state.context_id != c.local.ctx.id) return; + + const l = &c.local; + defer l.runMicrotasks(); + + state.finish(l); + } + + fn finishResolve(state: *PipeState, local: *const js.Local) void { + state.reader.releaseLock(); + if (state.resolver) |r| { + local.toLocal(r).resolve("pipeTo complete", {}); + } + } + + fn finish(state: *PipeState, local: *const js.Local) void { + state.reader.releaseLock(); + if (state.resolver) |r| { + local.toLocal(r).resolve("pipe finished", {}); + } + } + + fn newFunctionWithData( + local: *const js.Local, + comptime callback: *const fn (?*const v8.FunctionCallbackInfo) callconv(.c) void, + data: *anyopaque, + ) js.Function { + const external = local.isolate.createExternal(data); + const handle = v8.v8__Function__New__DEFAULT2(local.handle, callback, @ptrCast(external)).?; + return .{ + .local = local, + .handle = handle, + }; + } +}; + const Cancel = struct { callback: ?js.Function.Global = null, reason: ?[]const u8 = null, From 0749f60702478dcaf1afa21c643fafc1cd6f22f0 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 14:24:49 +0100 Subject: [PATCH 05/13] Preserve chunk value types through ReadableStream enqueue/read When JS called controller.enqueue(42), the value was coerced to the string "42" because Chunk only had uint8array and string variants. Add a js_value variant that persists the raw JS value handle, and expose enqueueValue(js.Value) as the JS-facing enqueue method so numbers, booleans, and objects round-trip with their original types. --- .../tests/streams/readable_stream.html | 71 +++++++++++++++++++ .../ReadableStreamDefaultController.zig | 38 +++++++++- .../streams/ReadableStreamDefaultReader.zig | 2 + .../webapi/streams/TransformStream.zig | 7 +- 4 files changed, 116 insertions(+), 2 deletions(-) diff --git a/src/browser/tests/streams/readable_stream.html b/src/browser/tests/streams/readable_stream.html index 3d00d6cf..c82fd985 100644 --- a/src/browser/tests/streams/readable_stream.html +++ b/src/browser/tests/streams/readable_stream.html @@ -301,3 +301,74 @@ testing.expectEqual(false, data3.done); })(); + + + + + + diff --git a/src/browser/webapi/streams/ReadableStreamDefaultController.zig b/src/browser/webapi/streams/ReadableStreamDefaultController.zig index c2f750bc..18228475 100644 --- a/src/browser/webapi/streams/ReadableStreamDefaultController.zig +++ b/src/browser/webapi/streams/ReadableStreamDefaultController.zig @@ -33,11 +33,13 @@ pub const Chunk = union(enum) { // the order matters, sorry. uint8array: js.TypedArray(u8), string: []const u8, + js_value: js.Value.Global, pub fn dupe(self: Chunk, allocator: std.mem.Allocator) !Chunk { return switch (self) { .string => |str| .{ .string = try allocator.dupe(u8, str) }, .uint8array => |arr| .{ .uint8array = try arr.dupe(allocator) }, + .js_value => |val| .{ .js_value = val }, }; } }; @@ -98,6 +100,40 @@ pub fn enqueue(self: *ReadableStreamDefaultController, chunk: Chunk) !void { ls.toLocal(resolver).resolve("stream enqueue", result); } +/// Enqueue a raw JS value, preserving its type (number, bool, object, etc.). +/// Used by the JS-facing API; internal Zig callers should use enqueue(Chunk). +pub fn enqueueValue(self: *ReadableStreamDefaultController, value: js.Value) !void { + if (self._stream._state != .readable) { + return error.StreamNotReadable; + } + + if (self._pending_reads.items.len == 0) { + const persisted = try value.persist(); + try self._queue.append(self._arena, .{ .js_value = persisted }); + return; + } + + const resolver = self._pending_reads.orderedRemove(0); + const persisted = try value.persist(); + const result = ReadableStreamDefaultReader.ReadResult{ + .done = false, + .value = .{ .js_value = persisted }, + }; + + if (comptime IS_DEBUG) { + if (self._page.js.local == null) { + log.fatal(.bug, "null context scope", .{ .src = "ReadableStreamDefaultController.enqueueValue", .url = self._page.url }); + std.debug.assert(self._page.js.local != null); + } + } + + var ls: js.Local.Scope = undefined; + self._page.js.localScope(&ls); + defer ls.deinit(); + + ls.toLocal(resolver).resolve("stream enqueue value", result); +} + pub fn close(self: *ReadableStreamDefaultController) !void { if (self._stream._state != .readable) { return error.StreamNotReadable; @@ -176,7 +212,7 @@ pub const JsApi = struct { pub var class_id: bridge.ClassId = undefined; }; - pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueue, .{}); + pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueueValue, .{}); pub const close = bridge.function(ReadableStreamDefaultController.close, .{}); pub const @"error" = bridge.function(ReadableStreamDefaultController.doError, .{}); pub const desiredSize = bridge.accessor(ReadableStreamDefaultController.getDesiredSize, null, .{}); diff --git a/src/browser/webapi/streams/ReadableStreamDefaultReader.zig b/src/browser/webapi/streams/ReadableStreamDefaultReader.zig index 053080c4..2d3c5bbe 100644 --- a/src/browser/webapi/streams/ReadableStreamDefaultReader.zig +++ b/src/browser/webapi/streams/ReadableStreamDefaultReader.zig @@ -44,11 +44,13 @@ pub const ReadResult = struct { empty, string: []const u8, uint8array: js.TypedArray(u8), + js_value: js.Value.Global, pub fn fromChunk(chunk: ReadableStreamDefaultController.Chunk) Chunk { return switch (chunk) { .string => |s| .{ .string = s }, .uint8array => |arr| .{ .uint8array = arr }, + .js_value => |val| .{ .js_value = val }, }; } }; diff --git a/src/browser/webapi/streams/TransformStream.zig b/src/browser/webapi/streams/TransformStream.zig index 6e51515d..c0ca34b6 100644 --- a/src/browser/webapi/streams/TransformStream.zig +++ b/src/browser/webapi/streams/TransformStream.zig @@ -178,6 +178,11 @@ pub const TransformStreamDefaultController = struct { try self._stream._readable._controller.enqueue(chunk); } + /// Enqueue a raw JS value, preserving its type. Used by the JS-facing API. + pub fn enqueueValue(self: *TransformStreamDefaultController, value: js.Value) !void { + try self._stream._readable._controller.enqueueValue(value); + } + pub fn doError(self: *TransformStreamDefaultController, reason: []const u8) !void { try self._stream._readable._controller.doError(reason); } @@ -195,7 +200,7 @@ pub const TransformStreamDefaultController = struct { pub var class_id: bridge.ClassId = undefined; }; - pub const enqueue = bridge.function(TransformStreamDefaultController.enqueue, .{}); + pub const enqueue = bridge.function(TransformStreamDefaultController.enqueueValue, .{}); pub const @"error" = bridge.function(TransformStreamDefaultController.doError, .{}); pub const terminate = bridge.function(TransformStreamDefaultController.terminate, .{}); }; From c1c0a7d494044f88e71404b356083aea38493a4f Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 14:30:39 +0100 Subject: [PATCH 06/13] Skip enqueue of empty chunks in TextDecoderStream After BOM stripping or when receiving an empty Uint8Array, the decoded input can be zero-length. Per spec, empty chunks should produce no output rather than enqueuing an empty string. --- .../tests/streams/text_decoder_stream.html | 21 +++++++++++++++++++ .../webapi/encoding/TextDecoderStream.zig | 3 +++ 2 files changed, 24 insertions(+) diff --git a/src/browser/tests/streams/text_decoder_stream.html b/src/browser/tests/streams/text_decoder_stream.html index 37bae8b4..cf38a2fd 100644 --- a/src/browser/tests/streams/text_decoder_stream.html +++ b/src/browser/tests/streams/text_decoder_stream.html @@ -59,3 +59,24 @@ testing.expectEqual(true, result2.done); })(); + + diff --git a/src/browser/webapi/encoding/TextDecoderStream.zig b/src/browser/webapi/encoding/TextDecoderStream.zig index 2c4605ed..2ba1e501 100644 --- a/src/browser/webapi/encoding/TextDecoderStream.zig +++ b/src/browser/webapi/encoding/TextDecoderStream.zig @@ -65,6 +65,9 @@ fn decodeTransform(controller: *TransformStream.DefaultController, chunk: js.Val input = input[3..]; } + // Per spec, empty chunks produce no output + if (input.len == 0) return; + try controller.enqueue(.{ .string = input }); } From c121dbbd677733330b7b9dfbc32e399756abac14 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 14:41:03 +0100 Subject: [PATCH 07/13] Add desiredSize accessor to WritableStreamDefaultWriter Returns 1 when writable (default high water mark), 0 when closed, and null when errored, matching the spec behavior for streams without a custom queuing strategy. --- src/browser/tests/streams/transform_stream.html | 8 ++++++++ .../webapi/streams/WritableStreamDefaultWriter.zig | 10 ++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/browser/tests/streams/transform_stream.html b/src/browser/tests/streams/transform_stream.html index 0918f1e3..57518915 100644 --- a/src/browser/tests/streams/transform_stream.html +++ b/src/browser/tests/streams/transform_stream.html @@ -50,6 +50,14 @@ } + +