From 5997be89f618811b5cc15f133343e36545c8aa0f Mon Sep 17 00:00:00 2001 From: Muki Kiboigo Date: Thu, 4 Sep 2025 23:26:59 -0700 Subject: [PATCH] implement remaining ReadableStream functionality --- src/browser/streams/ReadableStream.zig | 199 +++++++++++++++++- .../ReadableStreamDefaultController.zig | 41 +++- .../streams/ReadableStreamDefaultReader.zig | 57 +++-- src/browser/streams/streams.zig | 2 +- 4 files changed, 271 insertions(+), 28 deletions(-) diff --git a/src/browser/streams/ReadableStream.zig b/src/browser/streams/ReadableStream.zig index a33459d6..bc8fb41b 100644 --- a/src/browser/streams/ReadableStream.zig +++ b/src/browser/streams/ReadableStream.zig @@ -30,6 +30,7 @@ const ReadableStreamDefaultController = @import("ReadableStreamDefaultController const State = union(enum) { readable, closed: ?[]const u8, + cancelled: ?[]const u8, errored: Env.JsObject, }; @@ -38,8 +39,29 @@ cancel_resolver: v8.Persistent(v8.PromiseResolver), locked: bool = false, state: State = .readable, +cancel_fn: ?Env.Function = null, +pull_fn: ?Env.Function = null, + +strategy: QueueingStrategy, +reader_resolver: ?v8.Persistent(v8.PromiseResolver) = null, queue: std.ArrayListUnmanaged([]const u8) = .empty, +pub const ReadableStreamReadResult = struct { + const ValueUnion = + union(enum) { data: []const u8, empty: void }; + + value: ValueUnion, + done: bool, + + pub fn get_value(self: *const ReadableStreamReadResult) ValueUnion { + return self.value; + } + + pub fn get_done(self: *const ReadableStreamReadResult) bool { + return self.done; + } +}; + const UnderlyingSource = struct { start: ?Env.Function = null, pull: ?Env.Function = null, @@ -49,11 +71,11 @@ const UnderlyingSource = struct { const QueueingStrategy = struct { size: ?Env.Function = null, - high_water_mark: f64 = 1.0, + high_water_mark: u32 = 1, }; -pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { - _ = strategy; +pub fn constructor(underlying: ?UnderlyingSource, _strategy: ?QueueingStrategy, page: *Page) !*ReadableStream { + const strategy: QueueingStrategy = _strategy orelse .{}; const cancel_resolver = v8.Persistent(v8.PromiseResolver).init( page.main_context.isolate, @@ -61,7 +83,7 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p ); const stream = try page.arena.create(ReadableStream); - stream.* = ReadableStream{ .cancel_resolver = cancel_resolver }; + stream.* = ReadableStream{ .cancel_resolver = cancel_resolver, .strategy = strategy }; const controller = ReadableStreamDefaultController{ .stream = stream }; @@ -70,6 +92,15 @@ pub fn constructor(underlying: ?UnderlyingSource, strategy: ?QueueingStrategy, p if (src.start) |start| { try start.call(void, .{controller}); } + + if (src.cancel) |cancel| { + stream.cancel_fn = cancel; + } + + if (src.pull) |pull| { + stream.pull_fn = pull; + try stream.pullIf(); + } } return stream; @@ -79,7 +110,7 @@ pub fn get_locked(self: *const ReadableStream) bool { return self.locked; } -pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise { +pub fn _cancel(self: *ReadableStream, reason: ?[]const u8, page: *Page) !Env.Promise { if (self.locked) { return error.TypeError; } @@ -89,9 +120,31 @@ pub fn _cancel(self: *const ReadableStream, page: *Page) !Env.Promise { .resolver = self.cancel_resolver.castToPromiseResolver(), }; + self.state = .{ .cancelled = if (reason) |r| try page.arena.dupe(u8, r) else null }; + + // Call cancel callback. + if (self.cancel_fn) |cancel| { + if (reason) |r| { + try cancel.call(void, .{r}); + } else { + try cancel.call(void, .{}); + } + } + + try resolver.resolve({}); return resolver.promise(); } +pub fn pullIf(self: *ReadableStream) !void { + if (self.pull_fn) |pull_fn| { + // Must be under the high water mark AND readable. + if ((self.queue.items.len < self.strategy.high_water_mark) and self.state == .readable) { + const controller = ReadableStreamDefaultController{ .stream = self }; + try pull_fn.call(void, .{controller}); + } + } +} + const GetReaderOptions = struct { // Mode must equal 'byob' or be undefined. RangeError otherwise. mode: ?[]const u8 = null, @@ -102,6 +155,7 @@ pub fn _getReader(self: *ReadableStream, _options: ?GetReaderOptions, page: *Pag return error.TypeError; } + // TODO: Determine if we need the ReadableStreamBYOBReader const options = _options orelse GetReaderOptions{}; _ = options; @@ -144,3 +198,138 @@ test "streams: ReadableStream" { .{ "readResult.done", "false" }, }, .{}); } + +test "streams: ReadableStream cancel and close" { + var runner = try testing.jsRunner(testing.tracking_allocator, .{ .url = "https://lightpanda.io" }); + defer runner.deinit(); + try runner.testCases(&.{ + .{ "var readResult; var cancelResult; var closeResult;", "undefined" }, + + // Test 1: Stream with controller.close() + .{ + \\ const stream1 = new ReadableStream({ + \\ start(controller) { + \\ controller.enqueue("first"); + \\ controller.enqueue("second"); + \\ controller.close(); + \\ } + \\ }); + , + undefined, + }, + .{ "const reader1 = stream1.getReader();", undefined }, + .{ + \\ (async function () { + \\ readResult = await reader1.read(); + \\ }()); + \\ false; + , + "false", + }, + .{ "readResult.value", "first" }, + .{ "readResult.done", "false" }, + + // Read second chunk + .{ + \\ (async function () { + \\ readResult = await reader1.read(); + \\ }()); + \\ false; + , + "false", + }, + .{ "readResult.value", "second" }, + .{ "readResult.done", "false" }, + + // Read after close - should get done: true + .{ + \\ (async function () { + \\ readResult = await reader1.read(); + \\ }()); + \\ false; + , + "false", + }, + .{ "readResult.value", "undefined" }, + .{ "readResult.done", "true" }, + + // Test 2: Stream with reader.cancel() + .{ + \\ const stream2 = new ReadableStream({ + \\ start(controller) { + \\ controller.enqueue("data1"); + \\ controller.enqueue("data2"); + \\ controller.enqueue("data3"); + \\ }, + \\ cancel(reason) { + \\ closeResult = `Stream cancelled: ${reason}`; + \\ } + \\ }); + , + undefined, + }, + .{ "const reader2 = stream2.getReader();", undefined }, + + // Read one chunk before canceling + .{ + \\ (async function () { + \\ readResult = await reader2.read(); + \\ }()); + \\ false; + , + "false", + }, + .{ "readResult.value", "data1" }, + .{ "readResult.done", "false" }, + + // Cancel the stream + .{ + \\ (async function () { + \\ cancelResult = await reader2.cancel("user requested"); + \\ }()); + \\ false; + , + "false", + }, + .{ "cancelResult", "undefined" }, + .{ "closeResult", "Stream cancelled: user requested" }, + + // Try to read after cancel - should throw or return done + .{ + \\ try { + \\ (async function () { + \\ readResult = await reader2.read(); + \\ }()); + \\ } catch(e) { + \\ readResult = { error: e.name }; + \\ } + \\ false; + , + "false", + }, + + // Test 3: Cancel without reason + .{ + \\ const stream3 = new ReadableStream({ + \\ start(controller) { + \\ controller.enqueue("test"); + \\ }, + \\ cancel(reason) { + \\ closeResult = reason === undefined ? "no reason" : reason; + \\ } + \\ }); + , + undefined, + }, + .{ "const reader3 = stream3.getReader();", undefined }, + .{ + \\ (async function () { + \\ await reader3.cancel(); + \\ }()); + \\ false; + , + "false", + }, + .{ "closeResult", "no reason" }, + }, .{}); +} diff --git a/src/browser/streams/ReadableStreamDefaultController.zig b/src/browser/streams/ReadableStreamDefaultController.zig index ed1ca9a9..df55c85e 100644 --- a/src/browser/streams/ReadableStreamDefaultController.zig +++ b/src/browser/streams/ReadableStreamDefaultController.zig @@ -24,6 +24,7 @@ const Env = @import("../env.zig").Env; const v8 = @import("v8"); const ReadableStream = @import("./ReadableStream.zig"); +const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult; const ReadableStreamDefaultController = @This(); @@ -38,6 +39,16 @@ pub fn _close(self: *ReadableStreamDefaultController, _reason: ?[]const u8, page const reason = if (_reason) |reason| try page.arena.dupe(u8, reason) else null; self.stream.state = .{ .closed = reason }; + if (self.stream.reader_resolver) |rr| { + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = rr.castToPromiseResolver(), + }; + + try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); + self.stream.reader_resolver = null; + } + // close just sets as closed meaning it wont READ any more but anything in the queue is fine to read. // to discard, must use cancel. } @@ -49,9 +60,37 @@ pub fn _enqueue(self: *ReadableStreamDefaultController, chunk: []const u8, page: return error.TypeError; } + if (self.stream.reader_resolver) |rr| { + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = rr.castToPromiseResolver(), + }; + + try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = chunk }, .done = false }); + self.stream.reader_resolver = null; + + // rr.setWeakFinalizer(@ptrCast(self.stream), struct { + // fn callback(info: ?*v8.c.WeakCallbackInfo) void { + // const inner_stream: *ReadableStream = @ptrCast(@alignCast(v8.c.v8__WeakCallbackInfo__GetParameter(info).?)); + // inner_stream.reader_resolver = null; + // } + // }.callback, .kParameter); + } + try self.stream.queue.append(page.arena, chunk); + try self.stream.pullIf(); } -pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject) void { +pub fn _error(self: *ReadableStreamDefaultController, err: Env.JsObject, page: *Page) !void { self.stream.state = .{ .errored = err }; + + if (self.stream.reader_resolver) |rr| { + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = rr.castToPromiseResolver(), + }; + + try resolver.reject(err); + self.stream.reader_resolver = null; + } } diff --git a/src/browser/streams/ReadableStreamDefaultReader.zig b/src/browser/streams/ReadableStreamDefaultReader.zig index 1646a855..a339fb7a 100644 --- a/src/browser/streams/ReadableStreamDefaultReader.zig +++ b/src/browser/streams/ReadableStreamDefaultReader.zig @@ -24,6 +24,7 @@ const log = @import("../../log.zig"); const Env = @import("../env.zig").Env; const Page = @import("../page.zig").Page; const ReadableStream = @import("./ReadableStream.zig"); +const ReadableStreamReadResult = @import("./ReadableStream.zig").ReadableStreamReadResult; const ReadableStreamDefaultReader = @This(); @@ -47,23 +48,10 @@ pub fn get_closed(self: *const ReadableStreamDefaultReader) Env.Promise { return self.closed_resolver.promise(); } -pub fn _cancel(self: *ReadableStreamDefaultReader, page: *Page) !Env.Promise { - return try self.stream._cancel(page); +pub fn _cancel(self: *ReadableStreamDefaultReader, reason: ?[]const u8, page: *Page) !Env.Promise { + return try self.stream._cancel(reason, page); } -pub const ReadableStreamReadResult = struct { - value: ?[]const u8, - done: bool, - - pub fn get_value(self: *const ReadableStreamReadResult) !?[]const u8 { - return self.value; - } - - pub fn get_done(self: *const ReadableStreamReadResult) bool { - return self.done; - } -}; - pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise { const stream = self.stream; @@ -76,21 +64,35 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise .readable => { if (stream.queue.items.len > 0) { const data = self.stream.queue.orderedRemove(0); - try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); + try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); } else { - // TODO: need to wait until we have more data - try resolver.reject("TODO!"); - return error.Todo; + if (self.stream.reader_resolver) |rr| { + const r_resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = rr.castToPromiseResolver(), + }; + + return r_resolver.promise(); + } else { + const p_resolver = v8.Persistent(v8.PromiseResolver).init(page.main_context.isolate, resolver.resolver); + self.stream.reader_resolver = p_resolver; + return resolver.promise(); + } + + try self.stream.pullIf(); } }, .closed => |_| { if (stream.queue.items.len > 0) { const data = self.stream.queue.orderedRemove(0); - try resolver.resolve(ReadableStreamReadResult{ .value = data, .done = false }); + try resolver.resolve(ReadableStreamReadResult{ .value = .{ .data = data }, .done = false }); } else { - try resolver.resolve(ReadableStreamReadResult{ .value = null, .done = true }); + try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); } }, + .cancelled => |_| { + try resolver.resolve(ReadableStreamReadResult{ .value = .empty, .done = true }); + }, .errored => |err| { try resolver.reject(err); }, @@ -98,3 +100,16 @@ pub fn _read(self: *const ReadableStreamDefaultReader, page: *Page) !Env.Promise return resolver.promise(); } + +pub fn _releaseLock(self: *const ReadableStreamDefaultReader, page: *Page) !void { + self.stream.locked = false; + + if (self.stream.reader_resolver) |rr| { + const resolver = Env.PromiseResolver{ + .js_context = page.main_context, + .resolver = rr.castToPromiseResolver(), + }; + + try resolver.reject("TypeError"); + } +} diff --git a/src/browser/streams/streams.zig b/src/browser/streams/streams.zig index c33f5aa6..f345640f 100644 --- a/src/browser/streams/streams.zig +++ b/src/browser/streams/streams.zig @@ -18,7 +18,7 @@ pub const Interfaces = .{ @import("ReadableStream.zig"), + @import("ReadableStream.zig").ReadableStreamReadResult, @import("ReadableStreamDefaultReader.zig"), - @import("ReadableStreamDefaultReader.zig").ReadableStreamReadResult, @import("ReadableStreamDefaultController.zig"), };