diff --git a/src/browser/tests/streams/transform_stream.html b/src/browser/tests/streams/transform_stream.html
index 431a25ec..0918f1e3 100644
--- a/src/browser/tests/streams/transform_stream.html
+++ b/src/browser/tests/streams/transform_stream.html
@@ -81,3 +81,76 @@
testing.expectEqual(true, result2.done);
})();
+
+
+
+
+
+
diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig
index 64c1b529..a4e3703d 100644
--- a/src/browser/webapi/streams/ReadableStream.zig
+++ b/src/browser/webapi/streams/ReadableStream.zig
@@ -24,6 +24,7 @@ const Page = @import("../../Page.zig");
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
const ReadableStreamDefaultController = @import("ReadableStreamDefaultController.zig");
+const WritableStream = @import("WritableStream.zig");
const IS_DEBUG = @import("builtin").mode == .Debug;
@@ -233,6 +234,62 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi
return resolver.promise();
}
+/// pipeThrough(transform) — pipes this readable stream through a transform stream,
+/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties.
+pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*ReadableStream {
+ if (self.getLocked()) {
+ return error.ReaderLocked;
+ }
+
+ if (!transform.isObject()) {
+ return error.InvalidArgument;
+ }
+
+ const obj = transform.toObject();
+ const writable_val = try obj.get("writable");
+ const readable_val = try obj.get("readable");
+
+ const writable = try writable_val.toZig(*WritableStream);
+ const output_readable = try readable_val.toZig(*ReadableStream);
+
+ // Synchronously drain queued chunks from this stream into the writable side
+ try self.pipeToWritable(writable, page);
+
+ return output_readable;
+}
+
+/// pipeTo(writable) — pipes this readable stream to a writable stream.
+/// Returns a promise that resolves when piping is complete.
+pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) !js.Promise {
+ if (self.getLocked()) {
+ return page.js.local.?.rejectPromise("ReadableStream is locked");
+ }
+
+ try self.pipeToWritable(destination, page);
+
+ return page.js.local.?.resolvePromise(.{});
+}
+
+/// Internal: drain all queued chunks from this stream into a WritableStream.
+fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void {
+ const local = page.js.local.?;
+
+ // Drain all chunks from queue
+ while (self._controller.dequeue()) |chunk| {
+ // Convert chunk to a js.Value for the writable side
+ const js_val = switch (chunk) {
+ .string => |s| try local.zigValueToJs(s, .{}),
+ .uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }),
+ };
+ try writable.writeChunk(js_val, page);
+ }
+
+ // If the readable stream is closed, close the writable side too
+ if (self._state == .closed) {
+ try writable.closeStream(page);
+ }
+}
+
const Cancel = struct {
callback: ?js.Function.Global = null,
reason: ?[]const u8 = null,
@@ -251,6 +308,8 @@ pub const JsApi = struct {
pub const constructor = bridge.constructor(ReadableStream.init, .{});
pub const cancel = bridge.function(ReadableStream.cancel, .{});
pub const getReader = bridge.function(ReadableStream.getReader, .{});
+ pub const pipeThrough = bridge.function(ReadableStream.pipeThrough, .{});
+ pub const pipeTo = bridge.function(ReadableStream.pipeTo, .{});
pub const locked = bridge.accessor(ReadableStream.getLocked, null, .{});
pub const symbol_async_iterator = bridge.iterator(ReadableStream.getAsyncIterator, .{ .async = true });
};