Merge pull request #1699 from lightpanda-io/textencoder-stream-enhancements

Textencoder stream enhancements
This commit is contained in:
Pierre Tachoire
2026-03-03 08:12:07 +01:00
committed by GitHub
5 changed files with 64 additions and 93 deletions

View File

@@ -509,6 +509,7 @@ pub const Function = struct {
as_typed_array: bool = false, as_typed_array: bool = false,
null_as_undefined: bool = false, null_as_undefined: bool = false,
cache: ?Caching = null, cache: ?Caching = null,
embedded_receiver: bool = false,
// We support two ways to cache a value directly into a v8::Object. The // We support two ways to cache a value directly into a v8::Object. The
// difference between the two is like the difference between a Map // difference between the two is like the difference between a Map
@@ -579,6 +580,9 @@ pub const Function = struct {
var args: ParameterTypes(F) = undefined; var args: ParameterTypes(F) = undefined;
if (comptime opts.static) { if (comptime opts.static) {
args = try getArgs(F, 0, local, info); args = try getArgs(F, 0, local, info);
} else if (comptime opts.embedded_receiver) {
args = try getArgs(F, 1, local, info);
@field(args, "0") = @ptrCast(@alignCast(info.getData() orelse unreachable));
} else { } else {
args = try getArgs(F, 1, local, info); args = try getArgs(F, 1, local, info);
@field(args, "0") = try TaggedOpaque.fromJS(*T, info.getThis()); @field(args, "0") = try TaggedOpaque.fromJS(*T, info.getThis());

View File

@@ -539,6 +539,15 @@ fn postCompileModule(self: *Context, mod: js.Module, url: [:0]const u8, local: *
} }
} }
fn newFunctionWithData(local: *const js.Local, comptime callback: *const fn (?*const v8.FunctionCallbackInfo) callconv(.c) void, data: *anyopaque) js.Function {
const external = local.isolate.createExternal(data);
const handle = v8.v8__Function__New__DEFAULT2(local.handle, callback, @ptrCast(external)).?;
return .{
.local = local,
.handle = handle,
};
}
// == Callbacks == // == Callbacks ==
// Callback from V8, asking us to load a module. The "specifier" is // Callback from V8, asking us to load a module. The "specifier" is
// the src of the module to load. // the src of the module to load.
@@ -857,7 +866,7 @@ fn resolveDynamicModule(self: *Context, state: *DynamicModuleResolveState, modul
// last value of the module. But, for module loading, we need to // last value of the module. But, for module loading, we need to
// resolve to the module's namespace. // resolve to the module's namespace.
const then_callback = local.newFunctionWithData(struct { const then_callback = newFunctionWithData(local, struct {
pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
var c: Caller = undefined; var c: Caller = undefined;
c.initFromHandle(callback_handle); c.initFromHandle(callback_handle);
@@ -881,7 +890,7 @@ fn resolveDynamicModule(self: *Context, state: *DynamicModuleResolveState, modul
} }
}.callback, @ptrCast(state)); }.callback, @ptrCast(state));
const catch_callback = local.newFunctionWithData(struct { const catch_callback = newFunctionWithData(local, struct {
pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void { pub fn callback(callback_handle: ?*const v8.FunctionCallbackInfo) callconv(.c) void {
var c: Caller = undefined; var c: Caller = undefined;
c.initFromHandle(callback_handle); c.initFromHandle(callback_handle);

View File

@@ -82,13 +82,17 @@ pub fn createTypedArray(self: *const Local, comptime array_type: js.ArrayType, s
return .init(self, size); return .init(self, size);
} }
pub fn newFunctionWithData( pub fn newCallback(
self: *const Local, self: *const Local,
comptime callback: *const fn (?*const v8.FunctionCallbackInfo) callconv(.c) void, callback: anytype,
data: *anyopaque, data: anytype,
) js.Function { ) js.Function {
const external = self.isolate.createExternal(data); const external = self.isolate.createExternal(data);
const handle = v8.v8__Function__New__DEFAULT2(self.handle, callback, @ptrCast(external)).?; const handle = v8.v8__Function__New__DEFAULT2(self.handle, struct {
fn wrap(info_handle: ?*const js.v8.FunctionCallbackInfo) callconv(.c) void {
Caller.Function.call(@TypeOf(data), info_handle.?, callback, .{ .embedded_receiver = true });
}
}.wrap, @ptrCast(external)).?;
return .{ .local = self, .handle = handle }; return .{ .local = self, .handle = handle };
} }

View File

@@ -236,26 +236,19 @@ pub fn cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !js.Promi
/// pipeThrough(transform) — pipes this readable stream through a transform stream, /// pipeThrough(transform) — pipes this readable stream through a transform stream,
/// returning the readable side. `transform` is a JS object with `readable` and `writable` properties. /// returning the readable side. `transform` is a JS object with `readable` and `writable` properties.
pub fn pipeThrough(self: *ReadableStream, transform: js.Value, page: *Page) !*ReadableStream { const PipeTransform = struct {
writable: *WritableStream,
readable: *ReadableStream,
};
pub fn pipeThrough(self: *ReadableStream, transform: PipeTransform, page: *Page) !*ReadableStream {
if (self.getLocked()) { if (self.getLocked()) {
return error.ReaderLocked; return error.ReaderLocked;
} }
if (!transform.isObject()) {
return error.InvalidArgument;
}
const obj = transform.toObject();
const writable_val = try obj.get("writable");
const readable_val = try obj.get("readable");
const writable = try writable_val.toZig(*WritableStream);
const output_readable = try readable_val.toZig(*ReadableStream);
// Start async piping from this stream to the writable side // Start async piping from this stream to the writable side
try PipeState.startPipe(self, writable, null, page); try PipeState.startPipe(self, transform.writable, null, page);
return output_readable; return transform.readable;
} }
/// pipeTo(writable) — pipes this readable stream to a writable stream. /// pipeTo(writable) — pipes this readable stream to a writable stream.
@@ -279,7 +272,6 @@ pub fn pipeTo(self: *ReadableStream, destination: *WritableStream, page: *Page)
const PipeState = struct { const PipeState = struct {
reader: *ReadableStreamDefaultReader, reader: *ReadableStreamDefaultReader,
writable: *WritableStream, writable: *WritableStream,
page: *Page,
context_id: usize, context_id: usize,
resolver: ?js.PromiseResolver.Global, resolver: ?js.PromiseResolver.Global,
@@ -294,107 +286,69 @@ const PipeState = struct {
state.* = .{ state.* = .{
.reader = reader, .reader = reader,
.writable = writable, .writable = writable,
.page = page,
.context_id = page.js.id, .context_id = page.js.id,
.resolver = resolver, .resolver = resolver,
}; };
try state.pumpRead(page);
try state.pumpRead();
} }
fn pumpRead(state: *PipeState) !void { fn pumpRead(state: *PipeState, page: *Page) !void {
const local = state.page.js.local.?; const local = page.js.local.?;
// Call reader.read() which returns a Promise // Call reader.read() which returns a Promise
const read_promise = try state.reader.read(state.page); const read_promise = try state.reader.read(page);
// Create JS callback functions for .then() and .catch() // Create JS callback functions for .then() and .catch()
const then_fn = local.newFunctionWithData(&onReadFulfilled, state); const then_fn = local.newCallback(onReadFulfilled, state);
const catch_fn = local.newFunctionWithData(&onReadRejected, state); const catch_fn = local.newCallback(onReadRejected, state);
_ = read_promise.thenAndCatch(then_fn, catch_fn) catch { _ = read_promise.thenAndCatch(then_fn, catch_fn) catch {
state.finish(local); state.finish(local);
}; };
} }
fn onReadFulfilled(callback_handle: ?*const js.v8.FunctionCallbackInfo) callconv(.c) void { const ReadData = struct {
var c: js.Caller = undefined; done: bool,
c.initFromHandle(callback_handle); value: js.Value,
defer c.deinit(); };
fn onReadFulfilled(self: *PipeState, data_: ?ReadData, page: *Page) void {
const info = js.Caller.FunctionCallbackInfo{ .handle = callback_handle.? }; const local = page.js.local.?;
const state: *PipeState = @ptrCast(@alignCast(info.getData() orelse return)); const data = data_ orelse {
return self.finish(local);
if (state.context_id != c.local.ctx.id) return;
const l = &c.local;
defer l.runMicrotasks();
// Get the read result argument {done, value}
const result_val = info.getArg(0, l);
if (!result_val.isObject()) {
state.finish(l);
return;
}
const result_obj = result_val.toObject();
const done_val = result_obj.get("done") catch {
state.finish(l);
return;
}; };
const done = done_val.toBool();
if (done) { if (data.done) {
// Stream is finished, close the writable side // Stream is finished, close the writable side
state.writable.closeStream(state.page) catch {}; self.writable.closeStream(page) catch {};
state.finishResolve(l); self.reader.releaseLock();
if (self.resolver) |r| {
local.toLocal(r).resolve("pipeTo complete", {});
}
return; return;
} }
// Get the chunk value and write it to the writable side const value = data.value;
const chunk_val = result_obj.get("value") catch { if (value.isUndefined()) {
state.finish(l); return self.finish(local);
return; }
};
state.writable.writeChunk(chunk_val, state.page) catch { self.writable.writeChunk(value, page) catch {
state.finish(l); return self.finish(local);
return;
}; };
// Continue reading the next chunk // Continue reading the next chunk
state.pumpRead() catch { self.pumpRead(page) catch {
state.finish(l); self.finish(local);
}; };
} }
fn onReadRejected(callback_handle: ?*const js.v8.FunctionCallbackInfo) callconv(.c) void { fn onReadRejected(self: *PipeState, page: *Page) void {
var c: js.Caller = undefined; self.finish(page.js.local.?);
c.initFromHandle(callback_handle);
defer c.deinit();
const info = js.Caller.FunctionCallbackInfo{ .handle = callback_handle.? };
const state: *PipeState = @ptrCast(@alignCast(info.getData() orelse return));
if (state.context_id != c.local.ctx.id) return;
const l = &c.local;
defer l.runMicrotasks();
state.finish(l);
} }
fn finishResolve(state: *PipeState, local: *const js.Local) void { fn finish(self: *PipeState, local: *const js.Local) void {
state.reader.releaseLock(); self.reader.releaseLock();
if (state.resolver) |r| { if (self.resolver) |r| {
local.toLocal(r).resolve("pipeTo complete", {});
}
}
fn finish(state: *PipeState, local: *const js.Local) void {
state.reader.releaseLock();
if (state.resolver) |r| {
local.toLocal(r).resolve("pipe finished", {}); local.toLocal(r).resolve("pipe finished", {});
} }
} }

View File

@@ -27,7 +27,7 @@ const TransformStream = @This();
pub const DefaultController = TransformStreamDefaultController; pub const DefaultController = TransformStreamDefaultController;
pub const ZigTransformFn = *const fn (*TransformStreamDefaultController, js.Value) anyerror!void; const ZigTransformFn = *const fn (*TransformStreamDefaultController, js.Value) anyerror!void;
_readable: *ReadableStream, _readable: *ReadableStream,
_writable: *WritableStream, _writable: *WritableStream,