From 6ed011e2f881312de4e29497727577bab4e0e0d0 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 12:06:18 +0100 Subject: [PATCH] Add pipeThrough and pipeTo to ReadableStream Implement synchronous piping that drains queued chunks from a ReadableStream into a WritableStream. pipeThrough accepts any {readable, writable} pair (TransformStream, TextDecoderStream, etc.) and returns the readable side. pipeTo writes all chunks to a WritableStream and resolves when complete. --- .../tests/streams/transform_stream.html | 73 +++++++++++++++++++ src/browser/webapi/streams/ReadableStream.zig | 59 +++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/src/browser/tests/streams/transform_stream.html b/src/browser/tests/streams/transform_stream.html index 431a25ec..0918f1e3 100644 --- a/src/browser/tests/streams/transform_stream.html +++ b/src/browser/tests/streams/transform_stream.html @@ -81,3 +81,76 @@ testing.expectEqual(true, result2.done); })(); + + + + + + diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig index 64c1b529..a4e3703d 100644 --- a/src/browser/webapi/streams/ReadableStream.zig +++ b/src/browser/webapi/streams/ReadableStream.zig @@ -24,6 +24,7 @@ const Page = @import("../../Page.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig"); +const WritableStream = @import("WritableStream.zig"); const IS_DEBUG = @import("builtin").mode == .Debug; @@ -233,6 +234,62 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi return resolver.promise(); } +/// pipeThrough(transform) — pipes this readable stream through a transform stream, +/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties. +pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*ReadableStream { + if (self.getLocked()) { + return error.ReaderLocked; + } + + if (!transform.isObject()) { + return error.InvalidArgument; + } + + const obj = transform.toObject(); + const writable_val = try obj.get("writable"); + const readable_val = try obj.get("readable"); + + const writable = try writable_val.toZig(*WritableStream); + const output_readable = try readable_val.toZig(*ReadableStream); + + // Synchronously drain queued chunks from this stream into the writable side + try self.pipeToWritable(writable, page); + + return output_readable; +} + +/// pipeTo(writable) — pipes this readable stream to a writable stream. +/// Returns a promise that resolves when piping is complete. +pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) !js.Promise { + if (self.getLocked()) { + return page.js.local.?.rejectPromise("ReadableStream is locked"); + } + + try self.pipeToWritable(destination, page); + + return page.js.local.?.resolvePromise(.{}); +} + +/// Internal: drain all queued chunks from this stream into a WritableStream. +fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void { + const local = page.js.local.?; + + // Drain all chunks from queue + while (self._controller.dequeue()) |chunk| { + // Convert chunk to a js.Value for the writable side + const js_val = switch (chunk) { + .string => |s| try local.zigValueToJs(s, .{}), + .uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }), + }; + try writable.writeChunk(js_val, page); + } + + // If the readable stream is closed, close the writable side too + if (self._state == .closed) { + try writable.closeStream(page); + } +} + const Cancel = struct { callback: ?js.Function.Global = null, reason: ?[]const u8 = null, @@ -251,6 +308,8 @@ pub const JsApi = struct { pub const constructor = bridge.constructor(ReadableStream.init, .{}); pub const cancel = bridge.function(ReadableStream.cancel, .{}); pub const getReader = bridge.function(ReadableStream.getReader, .{}); + pub const pipeThrough = bridge.function(ReadableStream.pipeThrough, .{}); + pub const pipeTo = bridge.function(ReadableStream.pipeTo, .{}); pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{}); pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true }); };