implement remaining ReadableStream functionality

This commit is contained in:
Muki Kiboigo
2025-09-04 23:26:59 -07:00
parent 51ee313910
commit 463440bce4
4 changed files with 271 additions and 28 deletions

View File

@@ -30,6 +30,7 @@ const ReadableStreamDefaultController = @import("ReadableStreamDefaultController
const State = union(enum) {
readable,
closed: ?[]const u8,
cancelled: ?[]const u8,
errored: Env.JsObject,
};
@@ -38,8 +39,29 @@ cancel_resolver: v8.Persistent(v8.PromiseResolver),
locked: bool = false,
state: State = .readable,
cancel_fn: ?Env.Function = null,
pull_fn: ?Env.Function = null,
strategy: QueueingStrategy,
reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null,
queue: std.ArrayListUnmanaged([]const u8) = .empty,
pub const ReadableStreamReadResult = struct {
const ValueUnion =
union(enum) { data: []const u8, empty: void };
value: ValueUnion,
done: bool,
pub fn get_value(self: *const ReadableStreamReadResult) ValueUnion {
return self.value;
}
pub fn get_done(self: *const ReadableStreamReadResult) bool {
return self.done;
}
};
const UnderlyingSource = struct {
start: ?Env.Function = null,
pull: ?Env.Function = null,
@@ -49,11 +71,11 @@ const UnderlyingSource = struct {
const QueueingStrategy = struct {
size: ?Env.Function = null,
high_water_mark: f64 = 1.0,
high_water_mark: u32 = 1,
};
pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
_ = strategy;
pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream {
const strategy: QueueingStrategy = _strategy orelse .{};
const cancel_resolver = v8.Persistent(v8.PromiseResolver).init(
page.main_context.isolate,
@@ -61,7 +83,7 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p
);
const stream = try page.arena.create(ReadableStream);
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver };
stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy };
const controller = ReadableStreamDefaultController{ .stream = stream };
@@ -70,6 +92,15 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p
if (src.start) |start| {
try start.call(void, .{controller});
}
if (src.cancel) |cancel| {
stream.cancel_fn = cancel;
}
if (src.pull) |pull| {
stream.pull_fn = pull;
try stream.pullIf();
}
}
return stream;
@@ -79,7 +110,7 @@ pub fn get_locked(self: *const ReadableStream) bool {
return self.locked;
}
pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise {
pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Promise {
if (self.locked) {
return error.TypeError;
}
@@ -89,9 +120,31 @@ pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise {
.resolver = self.cancel_resolver.castToPromiseResolver(),
};
self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null };
// Call cancel callback.
if (self.cancel_fn) |cancel| {
if (reason) |r| {
try cancel.call(void, .{r});
} else {
try cancel.call(void, .{});
}
}
try resolver.resolve({});
return resolver.promise();
}
pub fn pullIf(self: *ReadableStream) !void {
if (self.pull_fn) |pull_fn| {
// Must be under the high water mark AND readable.
if ((self.queue.items.len < self.strategy.high_water_mark) and self.state == .readable) {
const controller = ReadableStreamDefaultController{ .stream = self };
try pull_fn.call(void, .{controller});
}
}
}
const GetReaderOptions = struct {
// Mode must equal 'byob' or be undefined. RangeError otherwise.
mode: ?[]const u8 = null,
@@ -102,6 +155,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag
return error.TypeError;
}
// TODO: Determine if we need the ReadableStreamBYOBReader
const options = _options orelse GetReaderOptions{};
_ = options;
@@ -144,3 +198,138 @@ test "streams: ReadableStream" {
.{ "readResult.done", "false" },
}, .{});
}
test "streams: ReadableStream cancel and close" {
var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" });
defer runner.deinit();
try runner.testCases(&.{
.{ "var readResult; var cancelResult; var closeResult;", "undefined" },
// Test 1: Stream with controller.close()
.{
\\ const stream1 = new ReadableStream({
\\ start(controller) {
\\ controller.enqueue("first");
\\ controller.enqueue("second");
\\ controller.close();
\\ }
\\ });
,
undefined,
},
.{ "const reader1 = stream1.getReader();", undefined },
.{
\\ (async function () {
\\ readResult = await reader1.read();
\\ }());
\\ false;
,
"false",
},
.{ "readResult.value", "first" },
.{ "readResult.done", "false" },
// Read second chunk
.{
\\ (async function () {
\\ readResult = await reader1.read();
\\ }());
\\ false;
,
"false",
},
.{ "readResult.value", "second" },
.{ "readResult.done", "false" },
// Read after close - should get done: true
.{
\\ (async function () {
\\ readResult = await reader1.read();
\\ }());
\\ false;
,
"false",
},
.{ "readResult.value", "undefined" },
.{ "readResult.done", "true" },
// Test 2: Stream with reader.cancel()
.{
\\ const stream2 = new ReadableStream({
\\ start(controller) {
\\ controller.enqueue("data1");
\\ controller.enqueue("data2");
\\ controller.enqueue("data3");
\\ },
\\ cancel(reason) {
\\ closeResult = `Stream cancelled: ${reason}`;
\\ }
\\ });
,
undefined,
},
.{ "const reader2 = stream2.getReader();", undefined },
// Read one chunk before canceling
.{
\\ (async function () {
\\ readResult = await reader2.read();
\\ }());
\\ false;
,
"false",
},
.{ "readResult.value", "data1" },
.{ "readResult.done", "false" },
// Cancel the stream
.{
\\ (async function () {
\\ cancelResult = await reader2.cancel("user requested");
\\ }());
\\ false;
,
"false",
},
.{ "cancelResult", "undefined" },
.{ "closeResult", "Stream cancelled: user requested" },
// Try to read after cancel - should throw or return done
.{
\\ try {
\\ (async function () {
\\ readResult = await reader2.read();
\\ }());
\\ } catch(e) {
\\ readResult = { error: e.name };
\\ }
\\ false;
,
"false",
},
// Test 3: Cancel without reason
.{
\\ const stream3 = new ReadableStream({
\\ start(controller) {
\\ controller.enqueue("test");
\\ },
\\ cancel(reason) {
\\ closeResult = reason === undefined ? "no reason" : reason;
\\ }
\\ });
,
undefined,
},
.{ "const reader3 = stream3.getReader();", undefined },
.{
\\ (async function () {
\\ await reader3.cancel();
\\ }());
\\ false;
,
"false",
},
.{ "closeResult", "no reason" },
}, .{});
}

View File

@@ -24,6 +24,7 @@ const Env = @import("../env.zig").Env;
const v8 = @import("v8");
const ReadableStream = @import("./ReadableStream.zig");
const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult;
const ReadableStreamDefaultController = @This();
@@ -38,6 +39,16 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page
const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null;
self.stream.state = .{ .closed = reason };
if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
self.stream.reader_resolver = null;
}
// close just sets as closed meaning it wont READ any more but anything in the queue is fine to read.
// to discard, must use cancel.
}
@@ -49,9 +60,37 @@ pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page:
return error.TypeError;
}
if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false });
self.stream.reader_resolver = null;
// rr.setWeakFinalizer(@ptrCast(self.stream), struct {
// fn callback(info: ?*v8.c.WeakCallbackInfo) void {
// const inner_stream: *ReadableStream = @ptrCast(@alignCast(v8.c.v8__WeakCallbackInfo__GetParameter(info).?));
// inner_stream.reader_resolver = null;
// }
// }.callback, .kParameter);
}
try self.stream.queue.append(page.arena, chunk);
try self.stream.pullIf();
}
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void {
pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject, page: *Page) !void {
self.stream.state = .{ .errored = err };
if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
try resolver.reject(err);
self.stream.reader_resolver = null;
}
}

View File

@@ -24,6 +24,7 @@ const log = @import("../../log.zig");
const Env = @import("../env.zig").Env;
const Page = @import("../page.zig").Page;
const ReadableStream = @import("./ReadableStream.zig");
const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult;
const ReadableStreamDefaultReader = @This();
@@ -47,23 +48,10 @@ pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise {
return self.closed_resolver.promise();
}
pub fn _cancel(self: *ReadableStreamDefaultReader, page: *Page) !Env.Promise {
return try self.stream._cancel(page);
pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise {
return try self.stream._cancel(reason, page);
}
pub const ReadableStreamReadResult = struct {
value: ?[]const u8,
done: bool,
pub fn get_value(self: *const ReadableStreamReadResult) !?[]const u8 {
return self.value;
}
pub fn get_done(self: *const ReadableStreamReadResult) bool {
return self.done;
}
};
pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise {
const stream = self.stream;
@@ -76,21 +64,35 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise
.readable => {
if (stream.queue.items.len > 0) {
const data = self.stream.queue.orderedRemove(0);
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false });
} else {
// TODO: need to wait until we have more data
try resolver.reject("TODO!");
return error.Todo;
if (self.stream.reader_resolver) |rr| {
const r_resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
return r_resolver.promise();
} else {
const p_resolver = v8.Persistent(v8.PromiseResolver).init(page.main_context.isolate, resolver.resolver);
self.stream.reader_resolver = p_resolver;
return resolver.promise();
}
try self.stream.pullIf();
}
},
.closed => |_| {
if (stream.queue.items.len > 0) {
const data = self.stream.queue.orderedRemove(0);
try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false });
try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false });
} else {
try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true });
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
}
},
.cancelled => |_| {
try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true });
},
.errored => |err| {
try resolver.reject(err);
},
@@ -98,3 +100,16 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise
return resolver.promise();
}
pub fn _releaseLock(self: *const ReadableStreamDefaultReader, page: *Page) !void {
self.stream.locked = false;
if (self.stream.reader_resolver) |rr| {
const resolver = Env.PromiseResolver{
.js_context = page.main_context,
.resolver = rr.castToPromiseResolver(),
};
try resolver.reject("TypeError");
}
}

View File

@@ -18,7 +18,7 @@
pub const Interfaces = .{
@import("ReadableStream.zig"),
@import("ReadableStream.zig").ReadableStreamReadResult,
@import("ReadableStreamDefaultReader.zig"),
@import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult,
@import("ReadableStreamDefaultController.zig"),
};