Merge pull request #1696 from lightpanda-io/textencoder-stream

Add TextEncoderStream and TextDecoderStream implementation
This commit is contained in:
Pierre Tachoire
2026-03-04 07:58:56 +01:00
committed by GitHub
16 changed files with 1229 additions and 11 deletions

View File

@@ -60,6 +60,11 @@ fn initWithContext(self: *Caller, ctx: *Context, v8_context: *const v8.Context)
ctx.local = &self.local; ctx.local = &self.local;
} }
pub fn initFromHandle(self: *Caller, handle: ?*const v8.FunctionCallbackInfo) void {
const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(handle).?;
self.init(isolate);
}
pub fn deinit(self: *Caller) void { pub fn deinit(self: *Caller) void {
const ctx = self.local.ctx; const ctx = self.local.ctx;
const call_depth = ctx.call_depth - 1; const call_depth = ctx.call_depth - 1;
@@ -441,6 +446,11 @@ pub const FunctionCallbackInfo = struct {
return .{ .local = local, .handle = v8.v8__FunctionCallbackInfo__INDEX(self.handle, @intCast(index)).? }; return .{ .local = local, .handle = v8.v8__FunctionCallbackInfo__INDEX(self.handle, @intCast(index)).? };
} }
pub fn getData(self: FunctionCallbackInfo) ?*anyopaque {
const data = v8.v8__FunctionCallbackInfo__Data(self.handle) orelse return null;
return v8.v8__External__Value(@ptrCast(data));
}
pub fn getThis(self: FunctionCallbackInfo) *const v8.Object { pub fn getThis(self: FunctionCallbackInfo) *const v8.Object {
return v8.v8__FunctionCallbackInfo__This(self.handle).?; return v8.v8__FunctionCallbackInfo__This(self.handle).?;
} }
@@ -499,6 +509,7 @@ pub const Function = struct {
as_typed_array: bool = false, as_typed_array: bool = false,
null_as_undefined: bool = false, null_as_undefined: bool = false,
cache: ?Caching = null, cache: ?Caching = null,
embedded_receiver: bool = false,
// We support two ways to cache a value directly into a v8::Object. The // We support two ways to cache a value directly into a v8::Object. The
// difference between the two is like the difference between a Map // difference between the two is like the difference between a Map
@@ -569,6 +580,9 @@ pub const Function = struct {
var args: ParameterTypes(F) = undefined; var args: ParameterTypes(F) = undefined;
if (comptime opts.static) { if (comptime opts.static) {
args = try getArgs(F, 0, local, info); args = try getArgs(F, 0, local, info);
} else if (comptime opts.embedded_receiver) {
args = try getArgs(F, 1, local, info);
@field(args, "0") = @ptrCast(@alignCast(info.getData() orelse unreachable));
} else { } else {
args = try getArgs(F, 1, local, info); args = try getArgs(F, 1, local, info);
@field(args, "0") = try TaggedOpaque.fromJS(*T, info.getThis()); @field(args, "0") = try TaggedOpaque.fromJS(*T, info.getThis());

View File

@@ -868,13 +868,12 @@ fn resolveDynamicModule(self: *Context, state: *DynamicModuleResolveState, modul
const then_callback = newFunctionWithData(local, struct { const then_callback = newFunctionWithData(local, struct {
pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?;
var c: Caller = undefined; var c: Caller = undefined;
c.init(isolate); c.initFromHandle(callback_handle);
defer c.deinit(); defer c.deinit();
const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; const info = Caller.FunctionCallbackInfo{ .handle = callback_handle.? };
const s: *DynamicModuleResolveState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); const s: *DynamicModuleResolveState = @ptrCast(@alignCast(info.getData() orelse return));
if (s.context_id != c.local.ctx.id) { if (s.context_id != c.local.ctx.id) {
// The microtask is tied to the isolate, not the context // The microtask is tied to the isolate, not the context
@@ -893,17 +892,15 @@ fn resolveDynamicModule(self: *Context, state: *DynamicModuleResolveState, modul
const catch_callback = newFunctionWithData(local, struct { const catch_callback = newFunctionWithData(local, struct {
pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?;
var c: Caller = undefined; var c: Caller = undefined;
c.init(isolate); c.initFromHandle(callback_handle);
defer c.deinit(); defer c.deinit();
const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; const info = Caller.FunctionCallbackInfo{ .handle = callback_handle.? };
const s: *DynamicModuleResolveState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); const s: *DynamicModuleResolveState = @ptrCast(@alignCast(info.getData() orelse return));
const l = &c.local; const l = &c.local;
const ctx = l.ctx; if (s.context_id != l.ctx.id) {
if (s.context_id != ctx.id) {
return; return;
} }

View File

@@ -82,6 +82,20 @@ pub fn createTypedArray(self: *const Local, comptime array_type: js.ArrayType, s
return .init(self, size); return .init(self, size);
} }
pub fn newCallback(
self: *const Local,
callback: anytype,
data: anytype,
) js.Function {
const external = self.isolate.createExternal(data);
const handle = v8.v8__Function__New__DEFAULT2(self.handle, struct {
fn wrap(info_handle: ?*const js.v8.FunctionCallbackInfo) callconv(.c) void {
Caller.Function.call(@TypeOf(data), info_handle.?, callback, .{ .embedded_receiver = true });
}
}.wrap, @ptrCast(external)).?;
return .{ .local = self, .handle = handle };
}
pub fn runMacrotasks(self: *const Local) void { pub fn runMacrotasks(self: *const Local) void {
const env = self.ctx.env; const env = self.ctx.env;
env.pumpMessageLoop(); env.pumpMessageLoop();

View File

@@ -827,6 +827,8 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/element/svg/Generic.zig"), @import("../webapi/element/svg/Generic.zig"),
@import("../webapi/encoding/TextDecoder.zig"), @import("../webapi/encoding/TextDecoder.zig"),
@import("../webapi/encoding/TextEncoder.zig"), @import("../webapi/encoding/TextEncoder.zig"),
@import("../webapi/encoding/TextEncoderStream.zig"),
@import("../webapi/encoding/TextDecoderStream.zig"),
@import("../webapi/Event.zig"), @import("../webapi/Event.zig"),
@import("../webapi/event/CompositionEvent.zig"), @import("../webapi/event/CompositionEvent.zig"),
@import("../webapi/event/CustomEvent.zig"), @import("../webapi/event/CustomEvent.zig"),
@@ -863,6 +865,10 @@ pub const JsApis = flattenTypes(&.{
@import("../webapi/streams/ReadableStream.zig"), @import("../webapi/streams/ReadableStream.zig"),
@import("../webapi/streams/ReadableStreamDefaultReader.zig"), @import("../webapi/streams/ReadableStreamDefaultReader.zig"),
@import("../webapi/streams/ReadableStreamDefaultController.zig"), @import("../webapi/streams/ReadableStreamDefaultController.zig"),
@import("../webapi/streams/WritableStream.zig"),
@import("../webapi/streams/WritableStreamDefaultWriter.zig"),
@import("../webapi/streams/WritableStreamDefaultController.zig"),
@import("../webapi/streams/TransformStream.zig"),
@import("../webapi/Node.zig"), @import("../webapi/Node.zig"),
@import("../webapi/storage/storage.zig"), @import("../webapi/storage/storage.zig"),
@import("../webapi/URL.zig"), @import("../webapi/URL.zig"),

View File

@@ -301,3 +301,74 @@
testing.expectEqual(false, data3.done); testing.expectEqual(false, data3.done);
})(); })();
</script> </script>
<script id=enqueue_preserves_number>
(async function() {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(42);
controller.enqueue(0);
controller.enqueue(3.14);
controller.close();
}
});
const reader = stream.getReader();
const r1 = await reader.read();
testing.expectEqual(false, r1.done);
testing.expectEqual('number', typeof r1.value);
testing.expectEqual(42, r1.value);
const r2 = await reader.read();
testing.expectEqual('number', typeof r2.value);
testing.expectEqual(0, r2.value);
const r3 = await reader.read();
testing.expectEqual('number', typeof r3.value);
testing.expectEqual(3.14, r3.value);
const r4 = await reader.read();
testing.expectEqual(true, r4.done);
})();
</script>
<script id=enqueue_preserves_bool>
(async function() {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(true);
controller.enqueue(false);
controller.close();
}
});
const reader = stream.getReader();
const r1 = await reader.read();
testing.expectEqual('boolean', typeof r1.value);
testing.expectEqual(true, r1.value);
const r2 = await reader.read();
testing.expectEqual('boolean', typeof r2.value);
testing.expectEqual(false, r2.value);
})();
</script>
<script id=enqueue_preserves_object>
(async function() {
const stream = new ReadableStream({
start(controller) {
controller.enqueue({ key: 'value', num: 7 });
controller.close();
}
});
const reader = stream.getReader();
const r1 = await reader.read();
testing.expectEqual('object', typeof r1.value);
testing.expectEqual('value', r1.value.key);
testing.expectEqual(7, r1.value.num);
})();
</script>

View File

@@ -0,0 +1,82 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=text_decoder_stream_encoding>
{
const tds = new TextDecoderStream();
testing.expectEqual('utf-8', tds.encoding);
testing.expectEqual('object', typeof tds.readable);
testing.expectEqual('object', typeof tds.writable);
testing.expectEqual(false, tds.fatal);
testing.expectEqual(false, tds.ignoreBOM);
}
</script>
<script id=text_decoder_stream_with_label>
{
const tds = new TextDecoderStream('utf-8');
testing.expectEqual('utf-8', tds.encoding);
}
</script>
<script id=text_decoder_stream_with_opts>
{
const tds = new TextDecoderStream('utf-8', { fatal: true, ignoreBOM: true });
testing.expectEqual(true, tds.fatal);
testing.expectEqual(true, tds.ignoreBOM);
}
</script>
<script id=text_decoder_stream_invalid_label>
{
let errorThrown = false;
try {
new TextDecoderStream('windows-1252');
} catch (e) {
errorThrown = true;
}
testing.expectEqual(true, errorThrown);
}
</script>
<script id=text_decoder_stream_decode>
(async function() {
const tds = new TextDecoderStream();
const writer = tds.writable.getWriter();
const reader = tds.readable.getReader();
// 'hello' in UTF-8 bytes
const bytes = new Uint8Array([104, 101, 108, 108, 111]);
await writer.write(bytes);
await writer.close();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('hello', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=text_decoder_stream_empty_chunk>
(async function() {
const tds = new TextDecoderStream();
const writer = tds.writable.getWriter();
const reader = tds.readable.getReader();
// Write an empty chunk followed by real data
await writer.write(new Uint8Array([]));
await writer.write(new Uint8Array([104, 105]));
await writer.close();
// Empty chunk should be filtered out; first read gets "hi"
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('hi', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>

View File

@@ -0,0 +1,164 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=transform_stream_basic>
{
const ts = new TransformStream();
testing.expectEqual('object', typeof ts);
testing.expectEqual('object', typeof ts.readable);
testing.expectEqual('object', typeof ts.writable);
}
</script>
<script id=transform_stream_with_transformer>
(async function() {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();
await writer.write('hello');
await writer.close();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('HELLO', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=writable_stream_basic>
{
const ws = new WritableStream();
testing.expectEqual('object', typeof ws);
testing.expectEqual(false, ws.locked);
}
</script>
<script id=writable_stream_writer>
{
const ws = new WritableStream();
const writer = ws.getWriter();
testing.expectEqual('object', typeof writer);
testing.expectEqual(true, ws.locked);
}
</script>
<script id=writable_stream_writer_desired_size>
{
const ws = new WritableStream();
const writer = ws.getWriter();
testing.expectEqual(1, writer.desiredSize);
}
</script>
<script id=text_encoder_stream_encoding>
{
const tes = new TextEncoderStream();
testing.expectEqual('utf-8', tes.encoding);
testing.expectEqual('object', typeof tes.readable);
testing.expectEqual('object', typeof tes.writable);
}
</script>
<script id=text_encoder_stream_encode>
(async function() {
const tes = new TextEncoderStream();
const writer = tes.writable.getWriter();
const reader = tes.readable.getReader();
await writer.write('hi');
await writer.close();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual(true, result.value instanceof Uint8Array);
// 'hi' in UTF-8 is [104, 105]
testing.expectEqual(104, result.value[0]);
testing.expectEqual(105, result.value[1]);
testing.expectEqual(2, result.value.length);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=pipe_through_basic>
(async function() {
const input = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.close();
}
});
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const output = input.pipeThrough(ts);
const reader = output.getReader();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('HELLO', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=pipe_to_basic>
(async function() {
const chunks = [];
const input = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.close();
}
});
const ws = new WritableStream({
write(chunk) {
chunks.push(chunk);
}
});
await input.pipeTo(ws);
testing.expectEqual(2, chunks.length);
testing.expectEqual('a', chunks[0]);
testing.expectEqual('b', chunks[1]);
})();
</script>
<script id=pipe_through_text_decoder>
(async function() {
const bytes = new Uint8Array([104, 101, 108, 108, 111]);
const input = new ReadableStream({
start(controller) {
controller.enqueue(bytes);
controller.close();
}
});
const output = input.pipeThrough(new TextDecoderStream());
const reader = output.getReader();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('hello', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>

View File

@@ -0,0 +1,127 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const std = @import("std");
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("../streams/ReadableStream.zig");
const WritableStream = @import("../streams/WritableStream.zig");
const TransformStream = @import("../streams/TransformStream.zig");
const TextDecoderStream = @This();
_transform: *TransformStream,
_fatal: bool,
_ignore_bom: bool,
const Label = enum {
utf8,
@"utf-8",
@"unicode-1-1-utf-8",
};
const InitOpts = struct {
fatal: bool = false,
ignoreBOM: bool = false,
};
pub fn init(label_: ?[]const u8, opts_: ?InitOpts, page: *Page) !TextDecoderStream {
if (label_) |label| {
_ = std.meta.stringToEnum(Label, label) orelse return error.RangeError;
}
const opts = opts_ orelse InitOpts{};
const decodeFn: TransformStream.ZigTransformFn = blk: {
if (opts.ignoreBOM) {
break :blk struct {
fn decode(controller: *TransformStream.DefaultController, chunk: js.Value) !void {
return decodeTransform(controller, chunk, true);
}
}.decode;
} else {
break :blk struct {
fn decode(controller: *TransformStream.DefaultController, chunk: js.Value) !void {
return decodeTransform(controller, chunk, false);
}
}.decode;
}
};
const transform = try TransformStream.initWithZigTransform(decodeFn, page);
return .{
._transform = transform,
._fatal = opts.fatal,
._ignore_bom = opts.ignoreBOM,
};
}
fn decodeTransform(controller: *TransformStream.DefaultController, chunk: js.Value, ignoreBOM: bool) !void {
// chunk should be a Uint8Array; decode it as UTF-8 string
const typed_array = try chunk.toZig(js.TypedArray(u8));
var input = typed_array.values;
// Strip UTF-8 BOM if present
if (ignoreBOM == false and std.mem.startsWith(u8, input, &.{ 0xEF, 0xBB, 0xBF })) {
input = input[3..];
}
// Per spec, empty chunks produce no output
if (input.len == 0) return;
try controller.enqueue(.{ .string = input });
}
pub fn getReadable(self: *const TextDecoderStream) *ReadableStream {
return self._transform.getReadable();
}
pub fn getWritable(self: *const TextDecoderStream) *WritableStream {
return self._transform.getWritable();
}
pub fn getFatal(self: *const TextDecoderStream) bool {
return self._fatal;
}
pub fn getIgnoreBOM(self: *const TextDecoderStream) bool {
return self._ignore_bom;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TextDecoderStream);
pub const Meta = struct {
pub const name = "TextDecoderStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(TextDecoderStream.init, .{});
pub const encoding = bridge.property("utf-8", .{ .template = false });
pub const readable = bridge.accessor(TextDecoderStream.getReadable, null, .{});
pub const writable = bridge.accessor(TextDecoderStream.getWritable, null, .{});
pub const fatal = bridge.accessor(TextDecoderStream.getFatal, null, .{});
pub const ignoreBOM = bridge.accessor(TextDecoderStream.getIgnoreBOM, null, .{});
};
const testing = @import("../../../testing.zig");
test "WebApi: TextDecoderStream" {
try testing.htmlRunner("streams/text_decoder_stream.html", .{});
}

View File

@@ -0,0 +1,70 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("../streams/ReadableStream.zig");
const WritableStream = @import("../streams/WritableStream.zig");
const TransformStream = @import("../streams/TransformStream.zig");
const TextEncoderStream = @This();
_transform: *TransformStream,
pub fn init(page: *Page) !TextEncoderStream {
const transform = try TransformStream.initWithZigTransform(&encodeTransform, page);
return .{
._transform = transform,
};
}
fn encodeTransform(controller: *TransformStream.DefaultController, chunk: js.Value) !void {
// chunk should be a JS string; encode it as UTF-8 bytes (Uint8Array)
const str = chunk.isString() orelse return error.InvalidChunk;
const slice = try str.toSlice();
try controller.enqueue(.{ .uint8array = .{ .values = slice } });
}
pub fn getReadable(self: *const TextEncoderStream) *ReadableStream {
return self._transform.getReadable();
}
pub fn getWritable(self: *const TextEncoderStream) *WritableStream {
return self._transform.getWritable();
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TextEncoderStream);
pub const Meta = struct {
pub const name = "TextEncoderStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(TextEncoderStream.init, .{});
pub const encoding = bridge.property("utf-8", .{ .template = false });
pub const readable = bridge.accessor(TextEncoderStream.getReadable, null, .{});
pub const writable = bridge.accessor(TextEncoderStream.getWritable, null, .{});
};
const testing = @import("../../../testing.zig");
test "WebApi: TextEncoderStream" {
try testing.htmlRunner("streams/transform_stream.html", .{});
}

View File

@@ -24,6 +24,7 @@ const Page = @import("../../Page.zig");
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const WritableStream = @import("WritableStream.zig");
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -233,6 +234,126 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi
return resolver.promise(); return resolver.promise();
} }
/// pipeThrough(transform) — pipes this readable stream through a transform stream,
/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties.
const PipeTransform = struct {
writable: *WritableStream,
readable: *ReadableStream,
};
pub fn pipeThrough(self: *ReadableStream, transform: PipeTransform, page: *Page) !*ReadableStream {
if (self.getLocked()) {
return error.ReaderLocked;
}
// Start async piping from this stream to the writable side
try PipeState.startPipe(self, transform.writable, null, page);
return transform.readable;
}
/// pipeTo(writable) — pipes this readable stream to a writable stream.
/// Returns a promise that resolves when piping is complete.
pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) !js.Promise {
if (self.getLocked()) {
return page.js.local.?.rejectPromise("ReadableStream is locked");
}
const local = page.js.local.?;
var pipe_resolver = local.createPromiseResolver();
const promise = pipe_resolver.promise();
const persisted_resolver = try pipe_resolver.persist();
try PipeState.startPipe(self, destination, persisted_resolver, page);
return promise;
}
/// State for an async pipe operation.
const PipeState = struct {
reader: *ReadableStreamDefaultReader,
writable: *WritableStream,
context_id: usize,
resolver: ?js.PromiseResolver.Global,
fn startPipe(
stream: *ReadableStream,
writable: *WritableStream,
resolver: ?js.PromiseResolver.Global,
page: *Page,
) !void {
const reader = try stream.getReader(page);
const state = try page.arena.create(PipeState);
state.* = .{
.reader = reader,
.writable = writable,
.context_id = page.js.id,
.resolver = resolver,
};
try state.pumpRead(page);
}
fn pumpRead(state: *PipeState, page: *Page) !void {
const local = page.js.local.?;
// Call reader.read() which returns a Promise
const read_promise = try state.reader.read(page);
// Create JS callback functions for .then() and .catch()
const then_fn = local.newCallback(onReadFulfilled, state);
const catch_fn = local.newCallback(onReadRejected, state);
_ = read_promise.thenAndCatch(then_fn, catch_fn) catch {
state.finish(local);
};
}
const ReadData = struct {
done: bool,
value: js.Value,
};
fn onReadFulfilled(self: *PipeState, data_: ?ReadData, page: *Page) void {
const local = page.js.local.?;
const data = data_ orelse {
return self.finish(local);
};
if (data.done) {
// Stream is finished, close the writable side
self.writable.closeStream(page) catch {};
self.reader.releaseLock();
if (self.resolver) |r| {
local.toLocal(r).resolve("pipeTo complete", {});
}
return;
}
const value = data.value;
if (value.isUndefined()) {
return self.finish(local);
}
self.writable.writeChunk(value, page) catch {
return self.finish(local);
};
// Continue reading the next chunk
self.pumpRead(page) catch {
self.finish(local);
};
}
fn onReadRejected(self: *PipeState, page: *Page) void {
self.finish(page.js.local.?);
}
fn finish(self: *PipeState, local: *const js.Local) void {
self.reader.releaseLock();
if (self.resolver) |r| {
local.toLocal(r).resolve("pipe finished", {});
}
}
};
const Cancel = struct { const Cancel = struct {
callback: ?js.Function.Global = null, callback: ?js.Function.Global = null,
reason: ?[]const u8 = null, reason: ?[]const u8 = null,
@@ -251,6 +372,8 @@ pub const JsApi = struct {
pub const constructor = bridge.constructor(ReadableStream.init, .{}); pub const constructor = bridge.constructor(ReadableStream.init, .{});
pub const cancel = bridge.function(ReadableStream.cancel, .{}); pub const cancel = bridge.function(ReadableStream.cancel, .{});
pub const getReader = bridge.function(ReadableStream.getReader, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{});
pub const pipeThrough = bridge.function(ReadableStream.pipeThrough, .{});
pub const pipeTo = bridge.function(ReadableStream.pipeTo, .{});
pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{}); pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{});
pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true });
}; };

View File

@@ -33,11 +33,13 @@ pub const Chunk = union(enum) {
// the order matters, sorry. // the order matters, sorry.
uint8array: js.TypedArray(u8), uint8array: js.TypedArray(u8),
string: []const u8, string: []const u8,
js_value: js.Value.Global,
pub fn dupe(self: Chunk, allocator: std.mem.Allocator) !Chunk { pub fn dupe(self: Chunk, allocator: std.mem.Allocator) !Chunk {
return switch (self) { return switch (self) {
.string => |str| .{ .string = try allocator.dupe(u8, str) }, .string => |str| .{ .string = try allocator.dupe(u8, str) },
.uint8array => |arr| .{ .uint8array = try arr.dupe(allocator) }, .uint8array => |arr| .{ .uint8array = try arr.dupe(allocator) },
.js_value => |val| .{ .js_value = val },
}; };
} }
}; };
@@ -98,6 +100,40 @@ pub fn enqueue(self: *ReadableStreamDefaultController, chunk: Chunk) !void {
ls.toLocal(resolver).resolve("stream enqueue", result); ls.toLocal(resolver).resolve("stream enqueue", result);
} }
/// Enqueue a raw JS value, preserving its type (number, bool, object, etc.).
/// Used by the JS-facing API; internal Zig callers should use enqueue(Chunk).
pub fn enqueueValue(self: *ReadableStreamDefaultController, value: js.Value) !void {
if (self._stream._state != .readable) {
return error.StreamNotReadable;
}
if (self._pending_reads.items.len == 0) {
const persisted = try value.persist();
try self._queue.append(self._arena, .{ .js_value = persisted });
return;
}
const resolver = self._pending_reads.orderedRemove(0);
const persisted = try value.persist();
const result = ReadableStreamDefaultReader.ReadResult{
.done = false,
.value = .{ .js_value = persisted },
};
if (comptime IS_DEBUG) {
if (self._page.js.local == null) {
log.fatal(.bug, "null context scope", .{ .src = "ReadableStreamDefaultController.enqueueValue", .url = self._page.url });
std.debug.assert(self._page.js.local != null);
}
}
var ls: js.Local.Scope = undefined;
self._page.js.localScope(&ls);
defer ls.deinit();
ls.toLocal(resolver).resolve("stream enqueue value", result);
}
pub fn close(self: *ReadableStreamDefaultController) !void { pub fn close(self: *ReadableStreamDefaultController) !void {
if (self._stream._state != .readable) { if (self._stream._state != .readable) {
return error.StreamNotReadable; return error.StreamNotReadable;
@@ -176,7 +212,7 @@ pub const JsApi = struct {
pub var class_id: bridge.ClassId = undefined; pub var class_id: bridge.ClassId = undefined;
}; };
pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueue, .{}); pub const enqueue = bridge.function(ReadableStreamDefaultController.enqueueValue, .{});
pub const close = bridge.function(ReadableStreamDefaultController.close, .{}); pub const close = bridge.function(ReadableStreamDefaultController.close, .{});
pub const @"error" = bridge.function(ReadableStreamDefaultController.doError, .{}); pub const @"error" = bridge.function(ReadableStreamDefaultController.doError, .{});
pub const desiredSize = bridge.accessor(ReadableStreamDefaultController.getDesiredSize, null, .{}); pub const desiredSize = bridge.accessor(ReadableStreamDefaultController.getDesiredSize, null, .{});

View File

@@ -44,11 +44,13 @@ pub const ReadResult = struct {
empty, empty,
string: []const u8, string: []const u8,
uint8array: js.TypedArray(u8), uint8array: js.TypedArray(u8),
js_value: js.Value.Global,
pub fn fromChunk(chunk: ReadableStreamDefaultController.Chunk) Chunk { pub fn fromChunk(chunk: ReadableStreamDefaultController.Chunk) Chunk {
return switch (chunk) { return switch (chunk) {
.string => |s| .{ .string = s }, .string => |s| .{ .string = s },
.uint8array => |arr| .{ .uint8array = arr }, .uint8array => |arr| .{ .uint8array = arr },
.js_value => |val| .{ .js_value = val },
}; };
} }
}; };

View File

@@ -0,0 +1,198 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const ReadableStream = @import("ReadableStream.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const WritableStream = @import("WritableStream.zig");
const TransformStream = @This();
pub const DefaultController = TransformStreamDefaultController;
pub const ZigTransformFn = *const fn (*TransformStreamDefaultController, js.Value) anyerror!void;
_readable: *ReadableStream,
_writable: *WritableStream,
_controller: *TransformStreamDefaultController,
const Transformer = struct {
start: ?js.Function = null,
transform: ?js.Function.Global = null,
flush: ?js.Function.Global = null,
};
pub fn init(transformer_: ?Transformer, page: *Page) !*TransformStream {
const readable = try ReadableStream.init(null, null, page);
const self = try page._factory.create(TransformStream{
._readable = readable,
._writable = undefined,
._controller = undefined,
});
const transform_controller = try TransformStreamDefaultController.init(
self,
if (transformer_) |t| t.transform else null,
if (transformer_) |t| t.flush else null,
null,
page,
);
self._controller = transform_controller;
self._writable = try WritableStream.initForTransform(self, page);
if (transformer_) |transformer| {
if (transformer.start) |start| {
try start.call(void, .{transform_controller});
}
}
return self;
}
pub fn initWithZigTransform(zig_transform: ZigTransformFn, page: *Page) !*TransformStream {
const readable = try ReadableStream.init(null, null, page);
const self = try page._factory.create(TransformStream{
._readable = readable,
._writable = undefined,
._controller = undefined,
});
const transform_controller = try TransformStreamDefaultController.init(self, null, null, zig_transform, page);
self._controller = transform_controller;
self._writable = try WritableStream.initForTransform(self, page);
return self;
}
pub fn transformWrite(self: *TransformStream, chunk: js.Value, page: *Page) !void {
if (self._controller._zig_transform_fn) |zig_fn| {
// Zig-level transform (used by TextEncoderStream etc.)
try zig_fn(self._controller, chunk);
return;
}
if (self._controller._transform_fn) |transform_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(transform_fn).call(void, .{ chunk, self._controller });
} else {
try self._readable._controller.enqueue(.{ .string = try chunk.toStringSlice() });
}
}
pub fn transformClose(self: *TransformStream, page: *Page) !void {
if (self._controller._flush_fn) |flush_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(flush_fn).call(void, .{self._controller});
}
try self._readable._controller.close();
}
pub fn getReadable(self: *const TransformStream) *ReadableStream {
return self._readable;
}
pub fn getWritable(self: *const TransformStream) *WritableStream {
return self._writable;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TransformStream);
pub const Meta = struct {
pub const name = "TransformStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(TransformStream.init, .{});
pub const readable = bridge.accessor(TransformStream.getReadable, null, .{});
pub const writable = bridge.accessor(TransformStream.getWritable, null, .{});
};
pub fn registerTypes() []const type {
return &.{
TransformStream,
TransformStreamDefaultController,
};
}
pub const TransformStreamDefaultController = struct {
_stream: *TransformStream,
_transform_fn: ?js.Function.Global,
_flush_fn: ?js.Function.Global,
_zig_transform_fn: ?ZigTransformFn,
pub fn init(
stream: *TransformStream,
transform_fn: ?js.Function.Global,
flush_fn: ?js.Function.Global,
zig_transform_fn: ?ZigTransformFn,
page: *Page,
) !*TransformStreamDefaultController {
return page._factory.create(TransformStreamDefaultController{
._stream = stream,
._transform_fn = transform_fn,
._flush_fn = flush_fn,
._zig_transform_fn = zig_transform_fn,
});
}
pub fn enqueue(self: *TransformStreamDefaultController, chunk: ReadableStreamDefaultController.Chunk) !void {
try self._stream._readable._controller.enqueue(chunk);
}
/// Enqueue a raw JS value, preserving its type. Used by the JS-facing API.
pub fn enqueueValue(self: *TransformStreamDefaultController, value: js.Value) !void {
try self._stream._readable._controller.enqueueValue(value);
}
pub fn doError(self: *TransformStreamDefaultController, reason: []const u8) !void {
try self._stream._readable._controller.doError(reason);
}
pub fn terminate(self: *TransformStreamDefaultController) !void {
try self._stream._readable._controller.close();
}
pub const JsApi = struct {
pub const bridge = js.Bridge(TransformStreamDefaultController);
pub const Meta = struct {
pub const name = "TransformStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const enqueue = bridge.function(TransformStreamDefaultController.enqueueValue, .{});
pub const @"error" = bridge.function(TransformStreamDefaultController.doError, .{});
pub const terminate = bridge.function(TransformStreamDefaultController.terminate, .{});
};
};

View File

@@ -0,0 +1,156 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStreamDefaultWriter = @import("WritableStreamDefaultWriter.zig");
const WritableStreamDefaultController = @import("WritableStreamDefaultController.zig");
const TransformStream = @import("TransformStream.zig");
const WritableStream = @This();
pub const State = enum {
writable,
closed,
errored,
};
_state: State,
_writer: ?*WritableStreamDefaultWriter,
_controller: *WritableStreamDefaultController,
_stored_error: ?[]const u8,
_write_fn: ?js.Function.Global,
_close_fn: ?js.Function.Global,
_transform_stream: ?*TransformStream,
const UnderlyingSink = struct {
start: ?js.Function = null,
write: ?js.Function.Global = null,
close: ?js.Function.Global = null,
abort: ?js.Function.Global = null,
type: ?[]const u8 = null,
};
pub fn init(sink_: ?UnderlyingSink, page: *Page) !*WritableStream {
const self = try page._factory.create(WritableStream{
._state = .writable,
._writer = null,
._controller = undefined,
._stored_error = null,
._write_fn = null,
._close_fn = null,
._transform_stream = null,
});
self._controller = try WritableStreamDefaultController.init(self, page);
if (sink_) |sink| {
if (sink.start) |start| {
try start.call(void, .{self._controller});
}
self._write_fn = sink.write;
self._close_fn = sink.close;
}
return self;
}
pub fn initForTransform(transform_stream: *TransformStream, page: *Page) !*WritableStream {
const self = try page._factory.create(WritableStream{
._state = .writable,
._writer = null,
._controller = undefined,
._stored_error = null,
._write_fn = null,
._close_fn = null,
._transform_stream = transform_stream,
});
self._controller = try WritableStreamDefaultController.init(self, page);
return self;
}
pub fn getWriter(self: *WritableStream, page: *Page) !*WritableStreamDefaultWriter {
if (self.getLocked()) {
return error.WriterLocked;
}
const writer = try WritableStreamDefaultWriter.init(self, page);
self._writer = writer;
return writer;
}
pub fn getLocked(self: *const WritableStream) bool {
return self._writer != null;
}
pub fn writeChunk(self: *WritableStream, chunk: js.Value, page: *Page) !void {
if (self._state != .writable) return;
if (self._transform_stream) |ts| {
try ts.transformWrite(chunk, page);
return;
}
if (self._write_fn) |write_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(write_fn).call(void, .{ chunk, self._controller });
}
}
pub fn closeStream(self: *WritableStream, page: *Page) !void {
if (self._state != .writable) return;
self._state = .closed;
if (self._transform_stream) |ts| {
try ts.transformClose(page);
return;
}
if (self._close_fn) |close_fn| {
var ls: js.Local.Scope = undefined;
page.js.localScope(&ls);
defer ls.deinit();
try ls.toLocal(close_fn).call(void, .{self._controller});
}
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStream);
pub const Meta = struct {
pub const name = "WritableStream";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const constructor = bridge.constructor(WritableStream.init, .{});
pub const getWriter = bridge.function(WritableStream.getWriter, .{});
pub const locked = bridge.accessor(WritableStream.getLocked, null, .{});
};
pub fn registerTypes() []const type {
return &.{
WritableStream,
};
}

View File

@@ -0,0 +1,49 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStream = @import("WritableStream.zig");
const WritableStreamDefaultController = @This();
_stream: *WritableStream,
pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultController {
return page._factory.create(WritableStreamDefaultController{
._stream = stream,
});
}
pub fn doError(self: *WritableStreamDefaultController, reason: []const u8) void {
if (self._stream._state != .writable) return;
self._stream._state = .errored;
self._stream._stored_error = reason;
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStreamDefaultController);
pub const Meta = struct {
pub const name = "WritableStreamDefaultController";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const @"error" = bridge.function(WritableStreamDefaultController.doError, .{});
};

View File

@@ -0,0 +1,109 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
const js = @import("../../js/js.zig");
const Page = @import("../../Page.zig");
const WritableStream = @import("WritableStream.zig");
const WritableStreamDefaultWriter = @This();
_stream: ?*WritableStream,
pub fn init(stream: *WritableStream, page: *Page) !*WritableStreamDefaultWriter {
return page._factory.create(WritableStreamDefaultWriter{
._stream = stream,
});
}
pub fn write(self: *WritableStreamDefaultWriter, chunk: js.Value, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state != .writable) {
return page.js.local.?.rejectPromise("Stream is not writable");
}
try stream.writeChunk(chunk, page);
return page.js.local.?.resolvePromise(.{});
}
pub fn close(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state != .writable) {
return page.js.local.?.rejectPromise("Stream is not writable");
}
try stream.closeStream(page);
return page.js.local.?.resolvePromise(.{});
}
pub fn releaseLock(self: *WritableStreamDefaultWriter) void {
if (self._stream) |stream| {
stream._writer = null;
self._stream = null;
}
}
pub fn getClosed(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
const stream = self._stream orelse {
return page.js.local.?.rejectPromise("Writer has been released");
};
if (stream._state == .closed) {
return page.js.local.?.resolvePromise(.{});
}
return page.js.local.?.resolvePromise(.{});
}
pub fn getDesiredSize(self: *const WritableStreamDefaultWriter) ?i32 {
const stream = self._stream orelse return null;
return switch (stream._state) {
.writable => 1,
.closed => 0,
.errored => null,
};
}
pub fn getReady(self: *WritableStreamDefaultWriter, page: *Page) !js.Promise {
_ = self;
return page.js.local.?.resolvePromise(.{});
}
pub const JsApi = struct {
pub const bridge = js.Bridge(WritableStreamDefaultWriter);
pub const Meta = struct {
pub const name = "WritableStreamDefaultWriter";
pub const prototype_chain = bridge.prototypeChain();
pub var class_id: bridge.ClassId = undefined;
};
pub const write = bridge.function(WritableStreamDefaultWriter.write, .{});
pub const close = bridge.function(WritableStreamDefaultWriter.close, .{});
pub const releaseLock = bridge.function(WritableStreamDefaultWriter.releaseLock, .{});
pub const closed = bridge.accessor(WritableStreamDefaultWriter.getClosed, null, .{});
pub const ready = bridge.accessor(WritableStreamDefaultWriter.getReady, null, .{});
pub const desiredSize = bridge.accessor(WritableStreamDefaultWriter.getDesiredSize, null, .{});
};