Add pipeThrough and pipeTo to ReadableStream

Implement synchronous piping that drains queued chunks from a
ReadableStream into a WritableStream. pipeThrough accepts any
{readable, writable} pair (TransformStream, TextDecoderStream, etc.)
and returns the readable side. pipeTo writes all chunks to a
WritableStream and resolves when complete.
This commit is contained in:
Pierre Tachoire
2026-03-02 12:06:18 +01:00
parent 23d322452a
commit 6ed011e2f8
2 changed files with 132 additions and 0 deletions

View File

@@ -81,3 +81,76 @@
testing.expectEqual(true, result2.done); testing.expectEqual(true, result2.done);
})(); })();
</script> </script>
<script id=pipe_through_basic>
(async function() {
const input = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.close();
}
});
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const output = input.pipeThrough(ts);
const reader = output.getReader();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('HELLO', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>
<script id=pipe_to_basic>
(async function() {
const chunks = [];
const input = new ReadableStream({
start(controller) {
controller.enqueue('a');
controller.enqueue('b');
controller.close();
}
});
const ws = new WritableStream({
write(chunk) {
chunks.push(chunk);
}
});
await input.pipeTo(ws);
testing.expectEqual(2, chunks.length);
testing.expectEqual('a', chunks[0]);
testing.expectEqual('b', chunks[1]);
})();
</script>
<script id=pipe_through_text_decoder>
(async function() {
const bytes = new Uint8Array([104, 101, 108, 108, 111]);
const input = new ReadableStream({
start(controller) {
controller.enqueue(bytes);
controller.close();
}
});
const output = input.pipeThrough(new TextDecoderStream());
const reader = output.getReader();
const result = await reader.read();
testing.expectEqual(false, result.done);
testing.expectEqual('hello', result.value);
const result2 = await reader.read();
testing.expectEqual(true, result2.done);
})();
</script>

View File

@@ -24,6 +24,7 @@ const Page = @import("../../Page.zig");
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
const WritableStream = @import("WritableStream.zig");
const IS_DEBUG = @import("builtin").mode == .Debug; const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -233,6 +234,62 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi
return resolver.promise(); return resolver.promise();
} }
/// pipeThrough(transform) — pipes this readable stream through a transform stream,
/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties.
pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*ReadableStream {
if (self.getLocked()) {
return error.ReaderLocked;
}
if (!transform.isObject()) {
return error.InvalidArgument;
}
const obj = transform.toObject();
const writable_val = try obj.get("writable");
const readable_val = try obj.get("readable");
const writable = try writable_val.toZig(*WritableStream);
const output_readable = try readable_val.toZig(*ReadableStream);
// Synchronously drain queued chunks from this stream into the writable side
try self.pipeToWritable(writable, page);
return output_readable;
}
/// pipeTo(writable) — pipes this readable stream to a writable stream.
/// Returns a promise that resolves when piping is complete.
pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) !js.Promise {
if (self.getLocked()) {
return page.js.local.?.rejectPromise("ReadableStream is locked");
}
try self.pipeToWritable(destination, page);
return page.js.local.?.resolvePromise(.{});
}
/// Internal: drain all queued chunks from this stream into a WritableStream.
fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void {
const local = page.js.local.?;
// Drain all chunks from queue
while (self._controller.dequeue()) |chunk| {
// Convert chunk to a js.Value for the writable side
const js_val = switch (chunk) {
.string => |s| try local.zigValueToJs(s, .{}),
.uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }),
};
try writable.writeChunk(js_val, page);
}
// If the readable stream is closed, close the writable side too
if (self._state == .closed) {
try writable.closeStream(page);
}
}
const Cancel = struct { const Cancel = struct {
callback: ?js.Function.Global = null, callback: ?js.Function.Global = null,
reason: ?[]const u8 = null, reason: ?[]const u8 = null,
@@ -251,6 +308,8 @@ pub const JsApi = struct {
pub const constructor = bridge.constructor(ReadableStream.init, .{}); pub const constructor = bridge.constructor(ReadableStream.init, .{});
pub const cancel = bridge.function(ReadableStream.cancel, .{}); pub const cancel = bridge.function(ReadableStream.cancel, .{});
pub const getReader = bridge.function(ReadableStream.getReader, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{});
pub const pipeThrough = bridge.function(ReadableStream.pipeThrough, .{});
pub const pipeTo = bridge.function(ReadableStream.pipeTo, .{});
pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{}); pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{});
pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true });
}; };