mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-03-22 04:34:44 +00:00
Implement async piping for ReadableStream.pipeThrough/pipeTo
Replace synchronous queue-draining approach with async promise-based
piping using V8's thenAndCatch callbacks. PipeState struct manages the
async read loop: reader.read() returns a Promise, onReadFulfilled
extracts {done, value}, writes chunks to the writable side, and
recurses via pumpRead() until the stream closes.
This commit is contained in:
@@ -20,6 +20,7 @@ const std = @import("std");
|
|||||||
const log = @import("../../../log.zig");
|
const log = @import("../../../log.zig");
|
||||||
|
|
||||||
const js = @import("../../js/js.zig");
|
const js = @import("../../js/js.zig");
|
||||||
|
const v8 = js.v8;
|
||||||
const Page = @import("../../Page.zig");
|
const Page = @import("../../Page.zig");
|
||||||
|
|
||||||
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
|
const ReadableStreamDefaultReader = @import("ReadableStreamDefaultReader.zig");
|
||||||
@@ -252,8 +253,8 @@ pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*Re
|
|||||||
const writable = try writable_val.toZig(*WritableStream);
|
const writable = try writable_val.toZig(*WritableStream);
|
||||||
const output_readable = try readable_val.toZig(*ReadableStream);
|
const output_readable = try readable_val.toZig(*ReadableStream);
|
||||||
|
|
||||||
// Synchronously drain queued chunks from this stream into the writable side
|
// Start async piping from this stream to the writable side
|
||||||
try self.pipeToWritable(writable, page);
|
try PipeState.startPipe(self, writable, null, page);
|
||||||
|
|
||||||
return output_readable;
|
return output_readable;
|
||||||
}
|
}
|
||||||
@@ -265,31 +266,159 @@ pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page)
|
|||||||
return page.js.local.?.rejectPromise("ReadableStream is locked");
|
return page.js.local.?.rejectPromise("ReadableStream is locked");
|
||||||
}
|
}
|
||||||
|
|
||||||
try self.pipeToWritable(destination, page);
|
|
||||||
|
|
||||||
return page.js.local.?.resolvePromise(.{});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Internal: drain all queued chunks from this stream into a WritableStream.
|
|
||||||
fn pipeToWritable(self: *ReadableStream, writable: *WritableStream, page: *Page) !void {
|
|
||||||
const local = page.js.local.?;
|
const local = page.js.local.?;
|
||||||
|
var pipe_resolver = local.createPromiseResolver();
|
||||||
|
const promise = pipe_resolver.promise();
|
||||||
|
const persisted_resolver = try pipe_resolver.persist();
|
||||||
|
|
||||||
// Drain all chunks from queue
|
try PipeState.startPipe(self, destination, persisted_resolver, page);
|
||||||
while (self._controller.dequeue()) |chunk| {
|
|
||||||
// Convert chunk to a js.Value for the writable side
|
|
||||||
const js_val = switch (chunk) {
|
|
||||||
.string => |s| try local.zigValueToJs(s, .{}),
|
|
||||||
.uint8array => |arr| try local.zigValueToJs(arr, .{ .as_typed_array = true }),
|
|
||||||
};
|
|
||||||
try writable.writeChunk(js_val, page);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the readable stream is closed, close the writable side too
|
return promise;
|
||||||
if (self._state == .closed) {
|
|
||||||
try writable.closeStream(page);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// State for an async pipe operation.
|
||||||
|
const PipeState = struct {
|
||||||
|
reader: *ReadableStreamDefaultReader,
|
||||||
|
writable: *WritableStream,
|
||||||
|
page: *Page,
|
||||||
|
context_id: usize,
|
||||||
|
resolver: ?js.PromiseResolver.Global,
|
||||||
|
|
||||||
|
fn startPipe(
|
||||||
|
stream: *ReadableStream,
|
||||||
|
writable: *WritableStream,
|
||||||
|
resolver: ?js.PromiseResolver.Global,
|
||||||
|
page: *Page,
|
||||||
|
) !void {
|
||||||
|
const reader = try stream.getReader(page);
|
||||||
|
const state = try page.arena.create(PipeState);
|
||||||
|
state.* = .{
|
||||||
|
.reader = reader,
|
||||||
|
.writable = writable,
|
||||||
|
.page = page,
|
||||||
|
.context_id = page.js.id,
|
||||||
|
.resolver = resolver,
|
||||||
|
};
|
||||||
|
|
||||||
|
try state.pumpRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pumpRead(state: *PipeState) !void {
|
||||||
|
const local = state.page.js.local.?;
|
||||||
|
|
||||||
|
// Call reader.read() which returns a Promise
|
||||||
|
const read_promise = try state.reader.read(state.page);
|
||||||
|
|
||||||
|
// Create JS callback functions for .then() and .catch()
|
||||||
|
const then_fn = newFunctionWithData(local, &onReadFulfilled, state);
|
||||||
|
const catch_fn = newFunctionWithData(local, &onReadRejected, state);
|
||||||
|
|
||||||
|
_ = read_promise.thenAndCatch(then_fn, catch_fn) catch {
|
||||||
|
state.finish(local);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn onReadFulfilled(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
|
||||||
|
const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?;
|
||||||
|
var c: js.Caller = undefined;
|
||||||
|
c.init(isolate);
|
||||||
|
defer c.deinit();
|
||||||
|
|
||||||
|
const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?;
|
||||||
|
const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data))));
|
||||||
|
|
||||||
|
if (state.context_id != c.local.ctx.id) return;
|
||||||
|
|
||||||
|
const l = &c.local;
|
||||||
|
defer l.runMicrotasks();
|
||||||
|
|
||||||
|
// Get the read result argument {done, value}
|
||||||
|
const result_val = js.Value{
|
||||||
|
.local = l,
|
||||||
|
.handle = v8.v8__FunctionCallbackInfo__INDEX(callback_handle, 0) orelse return,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!result_val.isObject()) {
|
||||||
|
state.finish(l);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const result_obj = result_val.toObject();
|
||||||
|
const done_val = result_obj.get("done") catch {
|
||||||
|
state.finish(l);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
const done = done_val.toBool();
|
||||||
|
|
||||||
|
if (done) {
|
||||||
|
// Stream is finished, close the writable side
|
||||||
|
state.writable.closeStream(state.page) catch {};
|
||||||
|
state.finishResolve(l);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the chunk value and write it to the writable side
|
||||||
|
const chunk_val = result_obj.get("value") catch {
|
||||||
|
state.finish(l);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
state.writable.writeChunk(chunk_val, state.page) catch {
|
||||||
|
state.finish(l);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Continue reading the next chunk
|
||||||
|
state.pumpRead() catch {
|
||||||
|
state.finish(l);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn onReadRejected(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
|
||||||
|
const isolate = v8.v8__FunctionCallbackInfo__GetIsolate(callback_handle).?;
|
||||||
|
var c: js.Caller = undefined;
|
||||||
|
c.init(isolate);
|
||||||
|
defer c.deinit();
|
||||||
|
|
||||||
|
const info_data = v8.v8__FunctionCallbackInfo__Data(callback_handle).?;
|
||||||
|
const state: *PipeState = @ptrCast(@alignCast(v8.v8__External__Value(@ptrCast(info_data))));
|
||||||
|
|
||||||
|
if (state.context_id != c.local.ctx.id) return;
|
||||||
|
|
||||||
|
const l = &c.local;
|
||||||
|
defer l.runMicrotasks();
|
||||||
|
|
||||||
|
state.finish(l);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finishResolve(state: *PipeState, local: *const js.Local) void {
|
||||||
|
state.reader.releaseLock();
|
||||||
|
if (state.resolver) |r| {
|
||||||
|
local.toLocal(r).resolve("pipeTo complete", {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish(state: *PipeState, local: *const js.Local) void {
|
||||||
|
state.reader.releaseLock();
|
||||||
|
if (state.resolver) |r| {
|
||||||
|
local.toLocal(r).resolve("pipe finished", {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn newFunctionWithData(
|
||||||
|
local: *const js.Local,
|
||||||
|
comptime callback: *const fn (?*const v8.FunctionCallbackInfo) callconv(.c) void,
|
||||||
|
data: *anyopaque,
|
||||||
|
) js.Function {
|
||||||
|
const external = local.isolate.createExternal(data);
|
||||||
|
const handle = v8.v8__Function__New__DEFAULT2(local.handle, callback, @ptrCast(external)).?;
|
||||||
|
return .{
|
||||||
|
.local = local,
|
||||||
|
.handle = handle,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const Cancel = struct {
|
const Cancel = struct {
|
||||||
callback: ?js.Function.Global = null,
|
callback: ?js.Function.Global = null,
|
||||||
reason: ?[]const u8 = null,
|
reason: ?[]const u8 = null,
|
||||||
|
|||||||
Reference in New Issue
Block a user