From ca0ef18bdf7f75d75c874b65f59d8ef1618c2312 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Mon, 2 Mar 2026 12:17:17 +0100 Subject: [PATCH] Implement async piping for ReadableStream.pipeThrough/pipeTo Replace synchronous queue-draining approach with async promise-based piping using V8's thenAndCatch callbacks. PipeState struct manages the async read loop: reader.read() returns a Promise, onReadFulfilled extracts {done, value}, writes chunks to the writable side, and recurses via pumpRead() until the stream closes. --- src/browser/webapi/streams/ReadableStream.zig | 173 +++++++++++++++--- 1 file changed, 151 insertions(+), 22 deletions(-) diff --git a/src/browser/webapi/streams/ReadableStream.zig b/src/browser/webapi/streams/ReadableStream.zig index a4e3703d..eba03a9e 100644 --- a/src/browser/webapi/streams/ReadableStream.zig +++ b/src/browser/webapi/streams/ReadableStream.zig @@ -20,6 +20,7 @@ const std = @import("std"); const log = @import("../../../log.zig"); const js = @import("../../js/js.zig"); +const v8 = js.v8; const Page = @import("../../Page.zig"); const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig"); @@ -252,8 +253,8 @@ pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*Re const writable = try writable_val.toZig(*WritableStream); const output_readable = try readable_val.toZig(*ReadableStream); - // Synchronously drain queued chunks from this stream into the writable side - try self.pipeToWritable(writable, page); + // Start async piping from this stream to the writable side + try PipeState.startPipe(self, writable, null, page); return output_readable; } @@ -265,31 +266,159 @@ pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page) return page.js.local.?.rejectPromise("ReadableStream is locked"); } - try self.pipeToWritable(destination, page); - - return page.js.local.?.resolvePromise(.{}); -} - -/// Internal: drain all queued chunks from this stream into a WritableStream. -fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void { const local = page.js.local.?; + var pipe_resolver = local.createPromiseResolver(); + const promise = pipe_resolver.promise(); + const persisted_resolver = try pipe_resolver.persist(); - // Drain all chunks from queue - while (self._controller.dequeue()) |chunk| { - // Convert chunk to a js.Value for the writable side - const js_val = switch (chunk) { - .string => |s| try local.zigValueToJs(s, .{}), - .uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }), - }; - try writable.writeChunk(js_val, page); - } + try PipeState.startPipe(self, destination, persisted_resolver, page); - // If the readable stream is closed, close the writable side too - if (self._state == .closed) { - try writable.closeStream(page); - } + return promise; } +/// State for an async pipe operation. +const PipeState = struct { + reader: *ReadableStreamDefaultReader, + writable: *WritableStream, + page: *Page, + context_id: usize, + resolver: ?js.PromiseResolver.Global, + + fn startPipe( + stream: *ReadableStream, + writable: *WritableStream, + resolver: ?js.PromiseResolver.Global, + page: *Page, + ) !void { + const reader = try stream.getReader(page); + const state = try page.arena.create(PipeState); + state.* = .{ + .reader = reader, + .writable = writable, + .page = page, + .context_id = page.js.id, + .resolver = resolver, + }; + + try state.pumpRead(); + } + + fn pumpRead(state: *PipeState) !void { + const local = state.page.js.local.?; + + // Call reader.read() which returns a Promise + const read_promise = try state.reader.read(state.page); + + // Create JS callback functions for .then() and .catch() + const then_fn = newFunctionWithData(local, &onReadFulfilled, state); + const catch_fn = newFunctionWithData(local, &onReadRejected, state); + + _ = read_promise.thenAndCatch(then_fn, catch_fn) catch { + state.finish(local); + }; + } + + fn onReadFulfilled(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { + const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?; + var c: js.Caller = undefined; + c.init(isolate); + defer c.deinit(); + + const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; + const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); + + if (state.context_id != c.local.ctx.id) return; + + const l = &c.local; + defer l.runMicrotasks(); + + // Get the read result argument {done, value} + const result_val = js.Value{ + .local = l, + .handle = v8.v8__FunctionCallbackInfo__INDEX(callback_handle, 0) orelse return, + }; + + if (!result_val.isObject()) { + state.finish(l); + return; + } + + const result_obj = result_val.toObject(); + const done_val = result_obj.get("done") catch { + state.finish(l); + return; + }; + const done = done_val.toBool(); + + if (done) { + // Stream is finished, close the writable side + state.writable.closeStream(state.page) catch {}; + state.finishResolve(l); + return; + } + + // Get the chunk value and write it to the writable side + const chunk_val = result_obj.get("value") catch { + state.finish(l); + return; + }; + + state.writable.writeChunk(chunk_val, state.page) catch { + state.finish(l); + return; + }; + + // Continue reading the next chunk + state.pumpRead() catch { + state.finish(l); + }; + } + + fn onReadRejected(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { + const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?; + var c: js.Caller = undefined; + c.init(isolate); + defer c.deinit(); + + const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?; + const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data)))); + + if (state.context_id != c.local.ctx.id) return; + + const l = &c.local; + defer l.runMicrotasks(); + + state.finish(l); + } + + fn finishResolve(state: *PipeState, local: *const js.Local) void { + state.reader.releaseLock(); + if (state.resolver) |r| { + local.toLocal(r).resolve("pipeTo complete", {}); + } + } + + fn finish(state: *PipeState, local: *const js.Local) void { + state.reader.releaseLock(); + if (state.resolver) |r| { + local.toLocal(r).resolve("pipe finished", {}); + } + } + + fn newFunctionWithData( + local: *const js.Local, + comptime callback: *const fn (?*const v8.FunctionCallbackInfo) callconv(.c) void, + data: *anyopaque, + ) js.Function { + const external = local.isolate.createExternal(data); + const handle = v8.v8__Function__New__DEFAULT2(local.handle, callback, @ptrCast(external)).?; + return .{ + .local = local, + .handle = handle, + }; + } +}; + const Cancel = struct { callback: ?js.Function.Global = null, reason: ?[]const u8 = null,