Implement WritableStream, TransformStream, and TextEncoderStream

Add the missing Streams API types needed for TextEncoderStream support:
- WritableStream with locked/getWriter, supporting both JS sink callbacks
and internal TransformStream routing
- WritableStreamDefaultWriter with write/close/releaseLock/closed/ready
- WritableStreamDefaultController with error()
- TransformStream with readable/writable accessors, JS transformer
callbacks (start/transform/flush), and Zig-level transform support
- TransformStreamDefaultController with enqueue/error/terminate
- TextEncoderStream that encodes string chunks to UTF-8 Uint8Array
via a Zig-level transform function
This commit is contained in:
Pierre Tachoire
2026-03-02 11:41:52 +01:00
parent 8c37cac957
commit 5d3b965d28
7 changed files with 671 additions and 0 deletions

View File

@@ -826,6 +826,7 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/element/svg/Generic.zig"), @import("../webapi/element/svg/Generic.zig"),
@import("../webapi/encoding/TextDecoder.zig"), @import("../webapi/encoding/TextDecoder.zig"),
@import("../webapi/encoding/TextEncoder.zig"), @import("../webapi/encoding/TextEncoder.zig"),
@import("../webapi/encoding/TextEncoderStream.zig"),
@import("../webapi/Event.zig"), @import("../webapi/Event.zig"),
@import("../webapi/event/CompositionEvent.zig"), @import("../webapi/event/CompositionEvent.zig"),
@import("../webapi/event/CustomEvent.zig"), @import("../webapi/event/CustomEvent.zig"),
@@ -862,6 +863,10 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/streams/ReadableStream.zig"), @import("../webapi/streams/ReadableStream.zig"),
@import("../webapi/streams/ReadableStreamDefaultReader.zig"), @import("../webapi/streams/ReadableStreamDefaultReader.zig"),
@import("../webapi/streams/ReadableStreamDefaultController.zig"), @import("../webapi/streams/ReadableStreamDefaultController.zig"),
@import("../webapi/streams/WritableStream.zig"),
@import("../webapi/streams/WritableStreamDefaultWriter.zig"),
@import("../webapi/streams/WritableStreamDefaultController.zig"),
@import("../webapi/streams/TransformStream.zig"),
@import("../webapi/Node.zig"), @import("../webapi/Node.zig"),
@import("../webapi/storage/storage.zig"), @import("../webapi/storage/storage.zig"),
@import("../webapi/URL.zig"), @import("../webapi/URL.zig"),

View File

@@ -0,0 +1,83 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=transform_stream_basic>
{
const ts = new TransformStream();
testing.expectEqual('object', typeof ts);
testing.expectEqual('object', typeof ts.readable);
testing.expectEqual('object', typeof ts.writable);
}
</script>
<script id=transform_stream_with_transformer>
(async function() {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();
await writer.write('hello');
await writer.close();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('HELLO', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=writable_stream_basic>
{
const ws = new WritableStream();
testing.expectEqual('object', typeof ws);
testing.expectEqual(false, ws.locked);
}
</script>
<script id=writable_stream_writer>
{
const ws = new WritableStream();
const writer = ws.getWriter();
testing.expectEqual('object', typeof writer);
testing.expectEqual(true, ws.locked);
}
</script>
<script id=text_encoder_stream_encoding>
{
const tes = new TextEncoderStream();
testing.expectEqual('utf-8', tes.encoding);
testing.expectEqual('object', typeof tes.readable);
testing.expectEqual('object', typeof tes.writable);
}
</script>
<script id=text_encoder_stream_encode>
(async function() {
const tes = new TextEncoderStream();
const writer = tes.writable.getWriter();
const reader = tes.readable.getReader();
await writer.write('hi');
await writer.close();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual(true, result.value instanceof Uint8Array);
// 'hi' in UTF-8 is [104, 105]
testing.expectEqual(104, result.value[0]);
testing.expectEqual(105, result.value[1]);
testing.expectEqual(2, result.value.length);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>

View File

@@ -0,0 +1,70 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("../streams/ReadableStream.zig");
const WritableStream = @import("../streams/WritableStream.zig");
const TransformStream = @import("../streams/TransformStream.zig");
const TextEncoderStream = @This();
_transform: *TransformStream,
pub fn init(page: *Page) !TextEncoderStream {
const transform = try TransformStream.initWithZigTransform(&encodeTransform, page);
return .{
._transform = transform,
};
}
fn encodeTransform(controller: *TransformStream.DefaultController, chunk: js.Value) !void {
// chunk should be a JS string; encode it as UTF-8 bytes (Uint8Array)
const str = chunk.isString() orelse return error.InvalidChunk;
const slice = try str.toSlice();
try controller.enqueue(.{ .uint8array = .{ .values = slice } });
}
pub fn getReadable(self: *const TextEncoderStream) *ReadableStream {
return self._transform.getReadable();
}
pub fn getWritable(self: *const TextEncoderStream) *WritableStream {
return self._transform.getWritable();
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TextEncoderStream);
pub const Meta = struct {
pub const name = "TextEncoderStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(TextEncoderStream.init, .{});
pub const encoding = bridge.property("utf-8", .{ .template = false });
pub const readable = bridge.accessor(TextEncoderStream.getReadable, null, .{});
pub const writable = bridge.accessor(TextEncoderStream.getWritable, null, .{});
};
const testing = @import("../../../testing.zig");
test "WebApi: TextEncoderStream" {
try testing.htmlRunner("streams/transform_stream.html", .{});
}

View File

@@ -0,0 +1,202 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const WritableStream = @import("WritableStream.zig");
const TransformStream = @This();
pub const DefaultController = TransformStreamDefaultController;
pub const ZigTransformFn = *const fn (*TransformStreamDefaultController, js.Value) anyerror!void;
_page: *Page,
_readable: *ReadableStream,
_writable: *WritableStream,
_controller: *TransformStreamDefaultController,
const Transformer = struct {
start: ?js.Function = null,
transform: ?js.Function.Global = null,
flush: ?js.Function.Global = null,
};
pub fn init(transformer_: ?Transformer, page: *Page) !*TransformStream {
const readable = try ReadableStream.init(null, null, page);
const self = try page._factory.create(TransformStream{
._page = page,
._readable = readable,
._writable = undefined,
._controller = undefined,
});
const transform_controller = try TransformStreamDefaultController.init(
self,
if (transformer_) |t| t.transform else null,
if (transformer_) |t| t.flush else null,
null,
page,
);
self._controller = transform_controller;
self._writable = try WritableStream.initForTransform(self, page);
if (transformer_) |transformer| {
if (transformer.start) |start| {
try start.call(void, .{transform_controller});
}
}
return self;
}
pub fn initWithZigTransform(zig_transform: ZigTransformFn, page: *Page) !*TransformStream {
const readable = try ReadableStream.init(null, null, page);
const self = try page._factory.create(TransformStream{
._page = page,
._readable = readable,
._writable = undefined,
._controller = undefined,
});
const transform_controller = try TransformStreamDefaultController.init(self, null, null, zig_transform, page);
self._controller = transform_controller;
self._writable = try WritableStream.initForTransform(self, page);
return self;
}
pub fn transformWrite(self: *TransformStream, chunk: js.Value, page: *Page) !void {
if (self._controller._zig_transform_fn) |zig_fn| {
// Zig-level transform (used by TextEncoderStream etc.)
try zig_fn(self._controller, chunk);
return;
}
if (self._controller._transform_fn) |transform_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(transform_fn).call(void, .{ chunk, self._controller });
} else {
// Default transform: pass through
if (chunk.isString()) |str| {
const slice = try str.toSlice();
try self._readable._controller.enqueue(.{ .string = slice });
}
}
}
pub fn transformClose(self: *TransformStream, page: *Page) !void {
if (self._controller._flush_fn) |flush_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(flush_fn).call(void, .{self._controller});
}
try self._readable._controller.close();
}
pub fn getReadable(self: *const TransformStream) *ReadableStream {
return self._readable;
}
pub fn getWritable(self: *const TransformStream) *WritableStream {
return self._writable;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TransformStream);
pub const Meta = struct {
pub const name = "TransformStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(TransformStream.init, .{});
pub const readable = bridge.accessor(TransformStream.getReadable, null, .{});
pub const writable = bridge.accessor(TransformStream.getWritable, null, .{});
};
pub fn registerTypes() []const type {
return &.{
TransformStream,
TransformStreamDefaultController,
};
}
pub const TransformStreamDefaultController = struct {
_page: *Page,
_stream: *TransformStream,
_transform_fn: ?js.Function.Global,
_flush_fn: ?js.Function.Global,
_zig_transform_fn: ?ZigTransformFn,
pub fn init(
stream: *TransformStream,
transform_fn: ?js.Function.Global,
flush_fn: ?js.Function.Global,
zig_transform_fn: ?ZigTransformFn,
page: *Page,
) !*TransformStreamDefaultController {
return page._factory.create(TransformStreamDefaultController{
._page = page,
._stream = stream,
._transform_fn = transform_fn,
._flush_fn = flush_fn,
._zig_transform_fn = zig_transform_fn,
});
}
pub fn enqueue(self: *TransformStreamDefaultController, chunk: ReadableStreamDefaultController.Chunk) !void {
try self._stream._readable._controller.enqueue(chunk);
}
pub fn doError(self: *TransformStreamDefaultController, reason: []const u8) !void {
try self._stream._readable._controller.doError(reason);
}
pub fn terminate(self: *TransformStreamDefaultController) !void {
try self._stream._readable._controller.close();
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TransformStreamDefaultController);
pub const Meta = struct {
pub const name = "TransformStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const enqueue = bridge.function(TransformStreamDefaultController.enqueue, .{});
pub const @"error" = bridge.function(TransformStreamDefaultController.doError, .{});
pub const terminate = bridge.function(TransformStreamDefaultController.terminate, .{});
};
};

View File

@@ -0,0 +1,159 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStreamDefaultWriter = @import("WritableStreamDefaultWriter.zig");
const WritableStreamDefaultController = @import("WritableStreamDefaultController.zig");
const TransformStream = @import("TransformStream.zig");
const WritableStream = @This();
pub const State = enum {
writable,
closed,
errored,
};
_page: *Page,
_state: State,
_writer: ?*WritableStreamDefaultWriter,
_controller: *WritableStreamDefaultController,
_stored_error: ?[]const u8,
_write_fn: ?js.Function.Global,
_close_fn: ?js.Function.Global,
_transform_stream: ?*TransformStream,
const UnderlyingSink = struct {
start: ?js.Function = null,
write: ?js.Function.Global = null,
close: ?js.Function.Global = null,
abort: ?js.Function.Global = null,
type: ?[]const u8 = null,
};
pub fn init(sink_: ?UnderlyingSink, page: *Page) !*WritableStream {
const self = try page._factory.create(WritableStream{
._page = page,
._state = .writable,
._writer = null,
._controller = undefined,
._stored_error = null,
._write_fn = null,
._close_fn = null,
._transform_stream = null,
});
self._controller = try WritableStreamDefaultController.init(self, page);
if (sink_) |sink| {
if (sink.start) |start| {
try start.call(void, .{self._controller});
}
self._write_fn = sink.write;
self._close_fn = sink.close;
}
return self;
}
pub fn initForTransform(transform_stream: *TransformStream, page: *Page) !*WritableStream {
const self = try page._factory.create(WritableStream{
._page = page,
._state = .writable,
._writer = null,
._controller = undefined,
._stored_error = null,
._write_fn = null,
._close_fn = null,
._transform_stream = transform_stream,
});
self._controller = try WritableStreamDefaultController.init(self, page);
return self;
}
pub fn getWriter(self: *WritableStream, page: *Page) !*WritableStreamDefaultWriter {
if (self.getLocked()) {
return error.WriterLocked;
}
const writer = try WritableStreamDefaultWriter.init(self, page);
self._writer = writer;
return writer;
}
pub fn getLocked(self: *const WritableStream) bool {
return self._writer != null;
}
pub fn writeChunk(self: *WritableStream, chunk: js.Value, page: *Page) !void {
if (self._state != .writable) return;
if (self._transform_stream) |ts| {
try ts.transformWrite(chunk, page);
return;
}
if (self._write_fn) |write_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(write_fn).call(void, .{ chunk, self._controller });
}
}
pub fn closeStream(self: *WritableStream, page: *Page) !void {
if (self._state != .writable) return;
self._state = .closed;
if (self._transform_stream) |ts| {
try ts.transformClose(page);
return;
}
if (self._close_fn) |close_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(close_fn).call(void, .{self._controller});
}
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStream);
pub const Meta = struct {
pub const name = "WritableStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(WritableStream.init, .{});
pub const getWriter = bridge.function(WritableStream.getWriter, .{});
pub const locked = bridge.accessor(WritableStream.getLocked, null, .{});
};
pub fn registerTypes() []const type {
return &.{
WritableStream,
};
}

View File

@@ -0,0 +1,51 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStream = @import("WritableStream.zig");
const WritableStreamDefaultController = @This();
_page: *Page,
_stream: *WritableStream,
pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultController {
return page._factory.create(WritableStreamDefaultController{
._page = page,
._stream = stream,
});
}
pub fn doError(self: *WritableStreamDefaultController, reason: []const u8) void {
if (self._stream._state != .writable) return;
self._stream._state = .errored;
self._stream._stored_error = reason;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStreamDefaultController);
pub const Meta = struct {
pub const name = "WritableStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const @"error" = bridge.function(WritableStreamDefaultController.doError, .{});
};

View File

@@ -0,0 +1,101 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStream = @import("WritableStream.zig");
const WritableStreamDefaultWriter = @This();
_page: *Page,
_stream: ?*WritableStream,
pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultWriter {
return page._factory.create(WritableStreamDefaultWriter{
._page = page,
._stream = stream,
});
}
pub fn write(self: *WritableStreamDefaultWriter, chunk: js.Value, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state != .writable) {
return page.js.local.?.rejectPromise("Stream is not writable");
}
try stream.writeChunk(chunk, page);
return page.js.local.?.resolvePromise(.{});
}
pub fn close(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state != .writable) {
return page.js.local.?.rejectPromise("Stream is not writable");
}
try stream.closeStream(page);
return page.js.local.?.resolvePromise(.{});
}
pub fn releaseLock(self: *WritableStreamDefaultWriter) void {
if (self._stream) |stream| {
stream._writer = null;
self._stream = null;
}
}
pub fn getClosed(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state == .closed) {
return page.js.local.?.resolvePromise(.{});
}
return page.js.local.?.resolvePromise(.{});
}
pub fn getReady(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
_ = self;
return page.js.local.?.resolvePromise(.{});
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStreamDefaultWriter);
pub const Meta = struct {
pub const name = "WritableStreamDefaultWriter";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const write = bridge.function(WritableStreamDefaultWriter.write, .{});
pub const close = bridge.function(WritableStreamDefaultWriter.close, .{});
pub const releaseLock = bridge.function(WritableStreamDefaultWriter.releaseLock, .{});
pub const closed = bridge.accessor(WritableStreamDefaultWriter.getClosed, null, .{});
pub const ready = bridge.accessor(WritableStreamDefaultWriter.getReady, null, .{});
};